You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/20 07:08:18 UTC
[cassandra] branch cassandra-3.0 updated: Avoid creating duplicate
rows during major upgrades
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 4d42c18 Avoid creating duplicate rows during major upgrades
4d42c18 is described below
commit 4d42c189fa82b32fd93ae42a164b91e4db62992e
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon May 4 09:35:54 2020 +0200
Avoid creating duplicate rows during major upgrades
Patch by Aleksey Yeschenko, Sam Tunnicliffe and Marcus Eriksson;
reviewed by Sylvain Lebresne and Alex Petrov for CASSANDRA-15789
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 19 ++
.../cassandra/config/DatabaseDescriptor.java | 31 +++
src/java/org/apache/cassandra/db/LegacyLayout.java | 18 +-
.../db/compaction/CompactionIterator.java | 5 +-
.../db/partitions/AbstractBTreePartition.java | 5 +-
.../db/partitions/ImmutableBTreePartition.java | 2 +-
.../cassandra/db/partitions/PartitionUpdate.java | 25 ++-
.../db/transform/DuplicateRowChecker.java | 139 ++++++++++++
.../org/apache/cassandra/service/ReadCallback.java | 7 +-
.../cassandra/service/SnapshotVerbHandler.java | 5 +
.../org/apache/cassandra/service/StorageProxy.java | 54 +++++
.../cassandra/service/StorageProxyMBean.java | 13 ++
.../cassandra/utils/DiagnosticSnapshotService.java | 188 ++++++++++++++++
.../cassandra/distributed/impl/Instance.java | 4 +-
.../upgrade/MixedModeReadRepairTest.java | 85 ++++++++
.../distributed/upgrade/UpgradeTestBase.java | 3 +-
.../org/apache/cassandra/db/LegacyLayoutTest.java | 39 +++-
.../db/compaction/CompactionIteratorTest.java | 134 ++++++++++++
.../db/partition/PartitionUpdateTest.java | 144 +++++++++++++
.../db/transform/DuplicateRowCheckerTest.java | 240 +++++++++++++++++++++
21 files changed, 1145 insertions(+), 16 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0a0a4d5..b875ae1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.21
+ * Avoid creating duplicate rows during major upgrades (CASSANDRA-15789)
* liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
* Fix Debian init start/stop (CASSANDRA-15770)
* Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index bc3e3bf..6003bd1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -361,6 +361,25 @@ public class Config
outboundBindAny = value;
}
+ /**
+ * If true, when rows with duplicate clustering keys are detected during a read or compaction
+ * a snapshot will be taken. In the read case, each a snapshot request will be issued to each
+ * replica involved in the query, for compaction the snapshot will be created locally.
+ * These are limited at the replica level so that only a single snapshot per-day can be taken
+ * via this method.
+ *
+ * This requires check_for_duplicate_rows_during_reads and/or check_for_duplicate_rows_during_compaction
+ * below to be enabled
+ */
+ public volatile boolean snapshot_on_duplicate_row_detection = false;
+
+ /**
+ * If these are enabled duplicate keys will get logged, and if snapshot_on_duplicate_row_detection
+ * is enabled, the table will get snapshotted for offline investigation
+ */
+ public volatile boolean check_for_duplicate_rows_during_reads = true;
+ public volatile boolean check_for_duplicate_rows_during_compaction = true;
+
public static boolean isClientMode()
{
return isClientMode;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a161a2a..4b732c2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2164,4 +2164,35 @@ public class DatabaseDescriptor
{
return strictRuntimeChecks;
}
+
+ public static boolean snapshotOnDuplicateRowDetection()
+ {
+ return conf.snapshot_on_duplicate_row_detection;
+ }
+
+ public static void setSnapshotOnDuplicateRowDetection(boolean enabled)
+ {
+ conf.snapshot_on_duplicate_row_detection = enabled;
+ }
+
+ public static boolean checkForDuplicateRowsDuringReads()
+ {
+ return conf.check_for_duplicate_rows_during_reads;
+ }
+
+ public static void setCheckForDuplicateRowsDuringReads(boolean enabled)
+ {
+ conf.check_for_duplicate_rows_during_reads = enabled;
+ }
+
+ public static boolean checkForDuplicateRowsDuringCompaction()
+ {
+ return conf.check_for_duplicate_rows_during_compaction;
+ }
+
+ public static void setCheckForDuplicateRowsDuringCompaction(boolean enabled)
+ {
+ conf.check_for_duplicate_rows_during_compaction = enabled;
+ }
+
}
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 42d50a1..37cc935 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -29,6 +29,8 @@ import java.util.stream.Collectors;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.SuperColumnCompatibility;
import org.apache.cassandra.utils.AbstractIterator;
+
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
@@ -1485,10 +1487,12 @@ public abstract class LegacyLayout
// written, as 2.x storage format does not guarantee just one range tombstone, unlike 3.x.
// We have to make sure that clustering matches, which would mean that tombstone is for the
// same row.
- if (rowDeletion != null && clustering.equals(tombstone.start.getAsClustering(metadata)))
+ if (clustering.equals(tombstone.start.getAsClustering(metadata)))
{
- // If the tombstone superceeds the previous delete, we discard the previous one
- if (tombstone.deletionTime.supersedes(rowDeletion.deletionTime))
+ // If the tombstone superceeds the previous delete, we discard the previous one.
+ // This assumes that we are building the row from a sane source (ie, this row deletion
+ // does not delete anything already added to the builder). See CASSANDRA-15789 for details
+ if (rowDeletion == null || tombstone.deletionTime.supersedes(rowDeletion.deletionTime))
{
builder.addRowDeletion(Row.Deletion.regular(tombstone.deletionTime));
rowDeletion = tombstone;
@@ -1497,7 +1501,7 @@ public abstract class LegacyLayout
return true;
}
- // If we're already within a row and there was no delete written before that one, it can't be the same one
+ // different clustering -> new row
return false;
}
@@ -1620,7 +1624,8 @@ public abstract class LegacyLayout
public final ColumnDefinition column;
public final ByteBuffer collectionElement;
- private LegacyCellName(Clustering clustering, ColumnDefinition column, ByteBuffer collectionElement)
+ @VisibleForTesting
+ public LegacyCellName(Clustering clustering, ColumnDefinition column, ByteBuffer collectionElement)
{
this.clustering = clustering;
this.column = column;
@@ -1740,7 +1745,8 @@ public abstract class LegacyLayout
public final int localDeletionTime;
public final int ttl;
- private LegacyCell(Kind kind, LegacyCellName name, ByteBuffer value, long timestamp, int localDeletionTime, int ttl)
+ @VisibleForTesting
+ public LegacyCell(Kind kind, LegacyCellName name, ByteBuffer value, long timestamp, int localDeletionTime, int ttl)
{
this.kind = kind;
this.name = name;
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 39cb2df..b132d90 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -25,6 +25,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+
+import org.apache.cassandra.db.transform.DuplicateRowChecker;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PurgeFunction;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -104,7 +106,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
: UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug
- this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
+ merged = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
+ this.compacted = DuplicateRowChecker.duringCompaction(merged, type);
}
public boolean isForThrift()
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 12dbb39..2cd9e97 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -320,10 +320,10 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity)
{
- return build(iterator, initialRowCapacity, true);
+ return build(iterator, initialRowCapacity, true, null);
}
- protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered)
+ protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver)
{
CFMetaData metadata = iterator.metadata();
PartitionColumns columns = iterator.columns();
@@ -331,6 +331,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
builder.auto(!ordered);
+ builder.setQuickResolver(quickResolver);
MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed);
while (iterator.hasNext())
diff --git a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
index 8d96f1e..8db5ee4 100644
--- a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
@@ -108,7 +108,7 @@ public class ImmutableBTreePartition extends AbstractBTreePartition
*/
public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered)
{
- return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), build(iterator, initialRowCapacity, ordered));
+ return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), build(iterator, initialRowCapacity, ordered, null));
}
protected Holder holder()
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f476f5b..3560e90 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -36,6 +37,7 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
@@ -214,7 +216,26 @@ public class PartitionUpdate extends AbstractBTreePartition
*/
public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator)
{
- Holder holder = build(iterator, 16);
+ return fromIterator(iterator, true, null);
+ }
+
+ private static final NoSpamLogger rowMergingLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+ /**
+ * Removes duplicate rows from incoming iterator, to be used when we can't trust the underlying iterator (like when reading legacy sstables)
+ */
+ public static PartitionUpdate fromPre30Iterator(UnfilteredRowIterator iterator)
+ {
+ return fromIterator(iterator, false, (a, b) -> {
+ CFMetaData cfm = iterator.metadata();
+ rowMergingLogger.warn(String.format("Merging rows from pre 3.0 iterator for partition key: %s",
+ cfm.getKeyValidator().getString(iterator.partitionKey().getKey())));
+ return Rows.merge(a, b, FBUtilities.nowInSeconds());
+ });
+ }
+
+ private static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver)
+ {
+ Holder holder = build(iterator, 16, ordered, quickResolver);
MutableDeletionInfo deletionInfo = (MutableDeletionInfo) holder.deletionInfo;
return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false);
}
@@ -746,7 +767,7 @@ public class PartitionUpdate extends AbstractBTreePartition
try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key))
{
assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families
- return PartitionUpdate.fromIterator(iterator);
+ return PartitionUpdate.fromPre30Iterator(iterator);
}
}
diff --git a/src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java b/src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java
new file mode 100644
index 0000000..7a6f7f9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.transform;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.DiagnosticSnapshotService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DuplicateRowChecker extends Transformation<BaseRowIterator<?>>
+{
+ private static final Logger logger = LoggerFactory.getLogger(DuplicateRowChecker.class);
+
+ Clustering previous = null;
+ int duplicatesDetected = 0;
+
+ final String stage;
+ final List<InetAddress> replicas;
+ final CFMetaData metadata;
+ final DecoratedKey key;
+ final boolean snapshotOnDuplicate;
+
+ DuplicateRowChecker(final DecoratedKey key,
+ final CFMetaData metadata,
+ final String stage,
+ final boolean snapshotOnDuplicate,
+ final List<InetAddress> replicas)
+ {
+ this.key = key;
+ this.metadata = metadata;
+ this.stage = stage;
+ this.snapshotOnDuplicate = snapshotOnDuplicate;
+ this.replicas = replicas;
+ }
+
+ protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+ {
+ return deletionTime;
+ }
+
+ protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ return marker;
+ }
+
+ protected Row applyToStatic(Row row)
+ {
+ return row;
+ }
+
+ protected Row applyToRow(Row row)
+ {
+ if (null != previous && row.clustering().equals(previous))
+ duplicatesDetected++;
+ previous = row.clustering();
+ return row;
+ }
+
+ protected void onPartitionClose()
+ {
+ if (duplicatesDetected > 0)
+ {
+ logger.warn("Detected {} duplicate rows for {} during {}",
+ duplicatesDetected,
+ metadata.getKeyValidator().getString(key.getKey()),
+ stage);
+ if (snapshotOnDuplicate)
+ DiagnosticSnapshotService.duplicateRows(metadata, replicas);
+ }
+ duplicatesDetected = 0;
+ previous = null;
+ super.onPartitionClose();
+ }
+
+ public static UnfilteredPartitionIterator duringCompaction(final UnfilteredPartitionIterator iterator, OperationType type)
+ {
+ if (!DatabaseDescriptor.checkForDuplicateRowsDuringCompaction())
+ return iterator;
+ final List<InetAddress> address = Collections.singletonList(FBUtilities.getBroadcastAddress());
+ final boolean snapshot = DatabaseDescriptor.snapshotOnDuplicateRowDetection();
+ return Transformation.apply(iterator, new Transformation<UnfilteredRowIterator>()
+ {
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ return Transformation.apply(partition, new DuplicateRowChecker(partition.partitionKey(),
+ partition.metadata(),
+ type.toString(),
+ snapshot,
+ address));
+ }
+ });
+ }
+
+ public static PartitionIterator duringRead(final PartitionIterator iterator, final List<InetAddress> replicas)
+ {
+ if (!DatabaseDescriptor.checkForDuplicateRowsDuringReads())
+ return iterator;
+ final boolean snapshot = DatabaseDescriptor.snapshotOnDuplicateRowDetection();
+ return Transformation.apply(iterator, new Transformation<RowIterator>()
+ {
+ protected RowIterator applyToPartition(RowIterator partition)
+ {
+ return Transformation.apply(partition, new DuplicateRowChecker(partition.partitionKey(),
+ partition.metadata(),
+ "Read",
+ snapshot,
+ replicas));
+ }
+ });
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 516384a..71eb0bc 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -32,6 +32,10 @@ import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.DuplicateRowChecker;
+import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
@@ -132,6 +136,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
: new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
}
+
public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
{
awaitResults();
@@ -139,7 +144,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve();
if (logger.isTraceEnabled())
logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
- return result;
+ return DuplicateRowChecker.duringRead(result, endpoints);
}
public int blockFor()
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index a997533..179abeb 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.DiagnosticSnapshotService;
public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
{
@@ -38,6 +39,10 @@ public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
{
Keyspace.clearSnapshot(command.snapshot_name, command.keyspace);
}
+ else if (DiagnosticSnapshotService.isDiagnosticSnapshotRequest(command))
+ {
+ DiagnosticSnapshotService.snapshot(command, message.from);
+ }
else
Keyspace.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from);
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 31898e6..19cd901 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2685,4 +2685,58 @@ public class StorageProxy implements StorageProxyMBean
public void setOtcBacklogExpirationInterval(int intervalInMillis) {
DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
}
+
+ @Override
+ public boolean getSnapshotOnDuplicateRowDetectionEnabled()
+ {
+ return DatabaseDescriptor.snapshotOnDuplicateRowDetection();
+ }
+
+ @Override
+ public void enableSnapshotOnDuplicateRowDetection()
+ {
+ DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
+ }
+
+ @Override
+ public void disableSnapshotOnDuplicateRowDetection()
+ {
+ DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(false);
+ }
+
+ @Override
+ public boolean getCheckForDuplicateRowsDuringReads()
+ {
+ return DatabaseDescriptor.checkForDuplicateRowsDuringReads();
+ }
+
+ @Override
+ public void enableCheckForDuplicateRowsDuringReads()
+ {
+ DatabaseDescriptor.setCheckForDuplicateRowsDuringReads(true);
+ }
+
+ @Override
+ public void disableCheckForDuplicateRowsDuringReads()
+ {
+ DatabaseDescriptor.setCheckForDuplicateRowsDuringReads(false);
+ }
+
+ @Override
+ public boolean getCheckForDuplicateRowsDuringCompaction()
+ {
+ return DatabaseDescriptor.checkForDuplicateRowsDuringCompaction();
+ }
+
+ @Override
+ public void enableCheckForDuplicateRowsDuringCompaction()
+ {
+ DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(true);
+ }
+
+ @Override
+ public void disableCheckForDuplicateRowsDuringCompaction()
+ {
+ DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(false);
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index ee82a5b..047934c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -21,6 +21,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
public interface StorageProxyMBean
{
public long getTotalHints();
@@ -64,4 +66,15 @@ public interface StorageProxyMBean
/** Returns each live node's schema version */
public Map<String, List<String>> getSchemaVersions();
+
+ void enableSnapshotOnDuplicateRowDetection();
+ void disableSnapshotOnDuplicateRowDetection();
+ boolean getSnapshotOnDuplicateRowDetectionEnabled();
+
+ boolean getCheckForDuplicateRowsDuringReads();
+ void enableCheckForDuplicateRowsDuringReads();
+ void disableCheckForDuplicateRowsDuringReads();
+ boolean getCheckForDuplicateRowsDuringCompaction();
+ void enableCheckForDuplicateRowsDuringCompaction();
+ void disableCheckForDuplicateRowsDuringCompaction();
}
diff --git a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
new file mode 100644
index 0000000..5c48412
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.net.InetAddress;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Provides a means to take snapshots when triggered by anomalous events or when the breaking of invariants is
+ * detected. When debugging certain classes of problems, having access to the relevant set of sstables when the problem
+ * is detected (or as close to then as possible) can be invaluable.
+ *
+ * This class performs two functions; on a replica where an anomaly is detected, it provides methods to issue snapshot
+ * requests to a provided set of replicas. For instance, if rows with duplicate clusterings are detected
+ * (CASSANDRA-15789) during a read, a snapshot request will be issued to all participating replicas. If detected during
+ * compaction, only the replica itself will receive the request. Requests are issued at a maximum rate of 1 per minute
+ * for any given table. Any additional triggers for the same table during the 60 second window are dropped, regardless
+ * of the replica set. This window is configurable via a system property (cassandra.diagnostic_snapshot_interval_nanos),
+ * but this is intended for use in testing only and operators are not expected to override the default.
+ *
+ * The second function performed is to handle snapshot requests on replicas. Snapshot names are prefixed with strings
+ * specific to the reason which triggered them. To manage consumption of disk space, replicas are restricted to taking
+ * a single snapshot for each prefix in a single calendar day. So if duplicate rows are detected by multiple
+ * coordinators during reads with the same replica set (or overlapping sets) on the same table, the coordinators may
+ * each issue snapshot requests, but the replicas will only accept the first one they receive. Further requests will
+ * be dropped on the replica side.
+ */
+public class DiagnosticSnapshotService
+{
+ private static final Logger logger = LoggerFactory.getLogger(DiagnosticSnapshotService.class);
+
+ public static final DiagnosticSnapshotService instance =
+ new DiagnosticSnapshotService(Executors.newSingleThreadExecutor(new NamedThreadFactory("DiagnosticSnapshot")));
+
+ public static final String DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX = "DuplicateRows-";
+
+ private final Executor executor;
+
+ private DiagnosticSnapshotService(Executor executor)
+ {
+ this.executor = executor;
+ }
+
+ // Issue at most 1 snapshot request per minute for any given table.
+ // Replicas will only create one snapshot per day, but this stops us
+ // from swamping the network.
+ // Overridable via system property for testing.
+ private static final long SNAPSHOT_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1);
+ private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.BASIC_ISO_DATE;
+ private final ConcurrentHashMap<UUID, AtomicLong> lastSnapshotTimes = new ConcurrentHashMap<>();
+
+ public static void duplicateRows(CFMetaData metadata, Iterable<InetAddress> replicas)
+ {
+ instance.maybeTriggerSnapshot(metadata, DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX, replicas);
+ }
+
+ public static boolean isDiagnosticSnapshotRequest(SnapshotCommand command)
+ {
+ return command.snapshot_name.startsWith(DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX);
+ }
+
+ public static void snapshot(SnapshotCommand command, InetAddress initiator)
+ {
+ Preconditions.checkArgument(isDiagnosticSnapshotRequest(command));
+ instance.maybeSnapshot(command, initiator);
+ }
+
+ public static String getSnapshotName(String prefix)
+ {
+ return String.format("%s%s", prefix, DATE_FORMAT.format(LocalDate.now()));
+ }
+
+ @VisibleForTesting
+ public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+ {
+ ExecutorUtils.shutdownNowAndWait(timeout, unit, executor);
+ }
+
+ private void maybeTriggerSnapshot(CFMetaData metadata, String prefix, Iterable<InetAddress> endpoints)
+ {
+ long now = System.nanoTime();
+ AtomicLong cached = lastSnapshotTimes.computeIfAbsent(metadata.cfId, u -> new AtomicLong(0));
+ long last = cached.get();
+ long interval = Long.getLong("cassandra.diagnostic_snapshot_interval_nanos", SNAPSHOT_INTERVAL_NANOS);
+ if (now - last > interval && cached.compareAndSet(last, now))
+ {
+ MessageOut<?> msg = new SnapshotCommand(metadata.ksName,
+ metadata.cfName,
+ getSnapshotName(prefix),
+ false).createMessage();
+ for (InetAddress replica : endpoints)
+ MessagingService.instance().sendOneWay(msg, replica);
+ }
+ else
+ {
+ logger.debug("Diagnostic snapshot request dropped due to throttling");
+ }
+ }
+
+ private void maybeSnapshot(SnapshotCommand command, InetAddress initiator)
+ {
+ executor.execute(new DiagnosticSnapshotTask(command, initiator));
+ }
+
+ private static class DiagnosticSnapshotTask implements Runnable
+ {
+ final SnapshotCommand command;
+ final InetAddress from;
+
+ DiagnosticSnapshotTask(SnapshotCommand command, InetAddress from)
+ {
+ this.command = command;
+ this.from = from;
+ }
+
+ public void run()
+ {
+ try
+ {
+ Keyspace ks = Keyspace.open(command.keyspace);
+ if (ks == null)
+ {
+ logger.info("Snapshot request received from {} for {}.{} but keyspace not found",
+ from,
+ command.keyspace,
+ command.column_family);
+ return;
+ }
+
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(command.column_family);
+ if (cfs.snapshotExists(command.snapshot_name))
+ {
+ logger.info("Received diagnostic snapshot request from {} for {}.{}, " +
+ "but snapshot with tag {} already exists",
+ from,
+ command.keyspace,
+ command.column_family,
+ command.snapshot_name);
+ return;
+ }
+ logger.info("Creating snapshot requested by {} of {}.{} tag: {}",
+ from,
+ command.keyspace,
+ command.column_family,
+ command.snapshot_name);
+ cfs.snapshot(command.snapshot_name);
+ }
+ catch (IllegalArgumentException e)
+ {
+ logger.warn("Snapshot request received from {} for {}.{} but CFS not found",
+ from,
+ command.keyspace,
+ command.column_family);
+ }
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index ed14404..d23eec0 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -99,6 +99,7 @@ import org.apache.cassandra.tools.NodeTool;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.DiagnosticSnapshotService;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
@@ -700,7 +701,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
() -> StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES),
() -> Ref.shutdownReferenceReaper(1L, MINUTES),
() -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
- () -> SSTableReader.shutdownBlocking(1L, MINUTES)
+ () -> SSTableReader.shutdownBlocking(1L, MINUTES),
+ () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES)
);
error = parallelRun(error, executor,
() -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index cc50053..e9391e0 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@ -18,12 +18,19 @@
package org.apache.cassandra.distributed.upgrade;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
import org.junit.Test;
+import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.shared.DistributedTestBase;
import org.apache.cassandra.distributed.shared.Versions;
+import static org.junit.Assert.fail;
+
public class MixedModeReadRepairTest extends UpgradeTestBase
{
@Test
@@ -49,4 +56,82 @@ public class MixedModeReadRepairTest extends UpgradeTestBase
.runAfterClusterUpgrade((cluster) -> cluster.get(2).forceCompact(DistributedTestBase.KEYSPACE, "tbl"))
.run();
}
+
+ @Test
+ public void mixedModeReadRepairDuplicateRows() throws Throwable
+ {
+ final String[] workload1 = new String[]
+ {
+ "DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 1 AND ck = 2;",
+ "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, {'a':'b'}) USING TIMESTAMP 3;",
+ "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, {'c':'d'}) USING TIMESTAMP 3;",
+ "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, {'e':'f'}) USING TIMESTAMP 3;",
+ };
+
+ final String[] workload2 = new String[]
+ {
+ "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, {'g':'h'}) USING TIMESTAMP 5;",
+ };
+
+ new TestCase()
+ .nodes(2)
+ .upgrade(Versions.Major.v22, Versions.Major.v30)
+ .setup((cluster) ->
+ {
+ cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck));");
+ })
+ .runAfterNodeUpgrade((cluster, node) ->
+ {
+ if (node == 2)
+ return;
+
+ // now node1 is 3.0 and node2 is 2.2
+ for (int i = 0; i < workload1.length; i++ )
+ cluster.coordinator(2).execute(workload1[i], ConsistencyLevel.QUORUM);
+
+ cluster.get(1).flush(KEYSPACE);
+ cluster.get(2).flush(KEYSPACE);
+
+ validate(cluster, 2, false);
+
+ for (int i = 0; i < workload2.length; i++ )
+ cluster.coordinator(2).execute(workload2[i], ConsistencyLevel.QUORUM);
+
+ cluster.get(1).flush(KEYSPACE);
+ cluster.get(2).flush(KEYSPACE);
+
+ validate(cluster, 1, true);
+ })
+ .run();
+ }
+
+ private void validate(UpgradeableCluster cluster, int nodeid, boolean local)
+ {
+ String query = "SELECT * FROM " + KEYSPACE + ".tbl";
+
+ Iterator<Object[]> iter = local
+ ? Iterators.forArray(cluster.get(nodeid).executeInternal(query))
+ : cluster.coordinator(nodeid).executeWithPaging(query, ConsistencyLevel.ALL, 2);
+
+ Object[] prevRow = null;
+ Object prevClustering = null;
+
+ while (iter.hasNext())
+ {
+ Object[] row = iter.next();
+ Object clustering = row[1];
+
+ if (clustering.equals(prevClustering))
+ {
+ fail(String.format("Duplicate rows on node %d in %s mode: \n%s\n%s",
+ nodeid,
+ local ? "local" : "distributed",
+ Arrays.toString(prevRow),
+ Arrays.toString(row)));
+ }
+
+ prevRow = row;
+ prevClustering = clustering;
+ }
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 3567453..8de3e4c 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.impl.Instance;
import org.apache.cassandra.distributed.impl.InstanceConfig;
import org.apache.cassandra.distributed.shared.Builder;
@@ -174,7 +175,7 @@ public class UpgradeTestBase extends DistributedTestBase
for (Version version : upgrade.upgrade)
{
- for (int n : nodesToUpgrade)
+ for (int n=1; n<=nodesToUpgrade.size(); n++)
{
cluster.get(n).shutdown().get();
cluster.get(n).setVersion(version);
diff --git a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
index 7f3b424..0bb2459 100644
--- a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
+++ b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
@@ -25,6 +25,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Cell;
@@ -370,4 +372,39 @@ public class LegacyLayoutTest
LegacyLayout.fromUnfilteredRowIterator(null, p.unfilteredIterator());
LegacyLayout.serializedSizeAsLegacyPartition(null, p.unfilteredIterator(), VERSION_21);
}
-}
\ No newline at end of file
+
+ @Test
+ public void testCellGrouper()
+ {
+ // CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))
+ CFMetaData cfm = CFMetaData.Builder.create("ks", "table")
+ .addPartitionKey("pk", Int32Type.instance)
+ .addClusteringColumn("ck", Int32Type.instance)
+ .addRegularColumn("v", MapType.getInstance(UTF8Type.instance, UTF8Type.instance, true))
+ .build();
+ SerializationHelper helper = new SerializationHelper(cfm, MessagingService.VERSION_22, SerializationHelper.Flag.LOCAL, ColumnFilter.all(cfm));
+ LegacyLayout.CellGrouper cg = new LegacyLayout.CellGrouper(cfm, helper);
+
+ Slice.Bound startBound = Slice.Bound.create(ClusteringPrefix.Kind.INCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
+ Slice.Bound endBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
+ LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(startBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+ LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(endBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+ LegacyLayout.LegacyRangeTombstone lrt = new LegacyLayout.LegacyRangeTombstone(start, end, new DeletionTime(2, 1588598040));
+ assertTrue(cg.addAtom(lrt));
+
+ // add a real cell
+ LegacyLayout.LegacyCell cell = new LegacyLayout.LegacyCell(LegacyLayout.LegacyCell.Kind.REGULAR,
+ new LegacyLayout.LegacyCellName(new Clustering(ByteBufferUtil.bytes(2)),
+ cfm.getColumnDefinition(ByteBufferUtil.bytes("v")),
+ ByteBufferUtil.bytes("g")),
+ ByteBufferUtil.bytes("v"), 3, Integer.MAX_VALUE, 0);
+ assertTrue(cg.addAtom(cell));
+
+ // add legacy range tombstone where collection name is null for the end bound (this gets translated to a row tombstone)
+ startBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
+ endBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
+ start = new LegacyLayout.LegacyBound(startBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+ end = new LegacyLayout.LegacyBound(endBound, false, null);
+ assertTrue(cg.addAtom(new LegacyLayout.LegacyRangeTombstone(start, end, new DeletionTime(1, 1588598040))));
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
new file mode 100644
index 0000000..549a94d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.assertCommandIssued;
+import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.iter;
+import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.makeRow;
+import static org.junit.Assert.*;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CompactionIteratorTest extends CQLTester
+{
+ @Test
+ public void duplicateRowsTest() throws Throwable
+ {
+ System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0");
+ // Create a table and insert some data. The actual rows read in the test will be synthetic
+ // but this creates an sstable on disk to be snapshotted.
+ createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))");
+ for (int i = 0; i < 10; i++)
+ execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i);
+ flush();
+
+ DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ CFMetaData metadata = getCurrentColumnFamilyStore().metadata;
+
+ final HashMap<InetAddress, MessageOut> sentMessages = new HashMap<>();
+ IMessageSink sink = new IMessageSink()
+ {
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+ {
+ sentMessages.put(to, message);
+ return false;
+ }
+
+ public boolean allowIncomingMessage(MessageIn message, int id)
+ {
+ return false;
+ }
+ };
+ MessagingService.instance().addMessageSink(sink);
+
+ // no duplicates
+ sentMessages.clear();
+ iterate(cfs, iter(metadata,
+ false,
+ makeRow(metadata,0, 0),
+ makeRow(metadata,0, 1),
+ makeRow(metadata,0, 2)));
+ assertCommandIssued(sentMessages, false);
+
+ // now test with a duplicate row and see that we issue a snapshot command
+ sentMessages.clear();
+ iterate(cfs, iter(metadata,
+ false,
+ makeRow(metadata, 0, 0),
+ makeRow(metadata, 0, 1),
+ makeRow(metadata, 0, 1)));
+ assertCommandIssued(sentMessages, true);
+ }
+
+ private void iterate(ColumnFamilyStore cfs, UnfilteredPartitionIterator partitions)
+ {
+
+ try (CompactionController controller = new CompactionController(getCurrentColumnFamilyStore(), Integer.MAX_VALUE);
+ ISSTableScanner scanner = scanner(cfs, partitions);
+ CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
+ Collections.singletonList(scanner),
+ controller, FBUtilities.nowInSeconds(), null))
+ {
+ while (iter.hasNext())
+ {
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ partition.forEachRemaining(u -> {});
+ }
+ }
+ }
+ }
+
+ private ISSTableScanner scanner(final ColumnFamilyStore cfs, final UnfilteredPartitionIterator partitions)
+ {
+
+ return new ISSTableScanner()
+ {
+ public long getLengthInBytes() { return 0; }
+
+ public long getCurrentPosition() { return 0; }
+
+ public String getBackingFiles() { return cfs.getLiveSSTables().iterator().next().toString(); }
+
+ public boolean isForThrift() { return false; }
+
+ public CFMetaData metadata() { return cfs.metadata; }
+
+ public void close() { }
+
+ public boolean hasNext() { return partitions.hasNext(); }
+
+ public UnfilteredRowIterator next() { return partitions.next(); }
+ };
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
index 7dff91f..2bd685c 100644
--- a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
+++ b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
@@ -17,15 +17,42 @@
*/
package org.apache.cassandra.db.partition;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowAndDeletionMergeIterator;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.Test;
import junit.framework.Assert;
+import static org.junit.Assert.assertEquals;
+
+
public class PartitionUpdateTest extends CQLTester
{
@Test
@@ -85,4 +112,121 @@ public class PartitionUpdateTest extends CQLTester
update = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), "key0").buildUpdate();
Assert.assertEquals(0, update.operationCount());
}
+
+ /**
+ * Makes sure we merge duplicate rows, see CASSANDRA-15789
+ */
+ @Test
+ public void testDuplicate()
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))");
+ CFMetaData cfm = currentTableMetadata();
+
+ DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+
+ List<Row> rows = new ArrayList<>();
+ Row.Builder builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
+ builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
+ builder.addComplexDeletion(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), new DeletionTime(2, 1588586647));
+
+ Cell c = BufferCell.live(cfm, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
+ builder.addCell(c);
+
+ Row r = builder.build();
+ rows.add(r);
+
+ builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
+ builder.addRowDeletion(new Row.Deletion(new DeletionTime(1588586647, 1), false));
+ r = builder.build();
+ rows.add(r);
+
+ RowAndDeletionMergeIterator rmi = new RowAndDeletionMergeIterator(cfm,
+ dk,
+ DeletionTime.LIVE,
+ ColumnFilter.all(cfm),
+ Rows.EMPTY_STATIC_ROW,
+ false,
+ EncodingStats.NO_STATS,
+ rows.iterator(),
+ Collections.emptyIterator(),
+ true);
+
+ PartitionUpdate pu = PartitionUpdate.fromPre30Iterator(rmi);
+ pu.iterator();
+
+ Mutation m = new Mutation(getCurrentColumnFamilyStore().keyspace.getName(), dk);
+ m.add(pu);
+ m.apply();
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+
+ SSTableReader sst = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next();
+ int count = 0;
+ try (ISSTableScanner scanner = sst.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator iter = scanner.next())
+ {
+ while (iter.hasNext())
+ {
+ iter.next();
+ count++;
+ }
+ }
+ }
+ }
+ assertEquals(1, count);
+ }
+
+ /**
+ * Makes sure we don't create duplicates when merging 2 partition updates
+ */
+ @Test
+ public void testMerge()
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))");
+ CFMetaData cfm = currentTableMetadata();
+
+ DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+
+ Row.Builder builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
+ builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
+ builder.addComplexDeletion(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), new DeletionTime(2, 1588586647));
+ Cell c = BufferCell.live(cfm, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
+ builder.addCell(c);
+ Row r = builder.build();
+
+ PartitionUpdate p1 = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 2);
+ p1.add(r);
+
+ builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
+ builder.addRowDeletion(new Row.Deletion(new DeletionTime(1588586647, 1), false));
+ r = builder.build();
+ PartitionUpdate p2 = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 2);
+ p2.add(r);
+
+ Mutation m = new Mutation(getCurrentColumnFamilyStore().keyspace.getName(), dk);
+ m.add(PartitionUpdate.merge(Lists.newArrayList(p1, p2)));
+ m.apply();
+
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+
+ SSTableReader sst = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next();
+ int count = 0;
+ try (ISSTableScanner scanner = sst.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator iter = scanner.next())
+ {
+ while (iter.hasNext())
+ {
+ iter.next();
+ count++;
+ }
+ }
+ }
+ }
+ assertEquals(1, count);
+ }
}
diff --git a/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
new file mode 100644
index 0000000..78a0c8c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.transform;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
+import org.junit.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.DiagnosticSnapshotService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DuplicateRowCheckerTest extends CQLTester
+{
+ ColumnFamilyStore cfs;
+ CFMetaData metadata;
+ static HashMap<InetAddress, MessageOut> sentMessages;
+
+ @BeforeClass
+ public static void setupMessaging()
+ {
+ sentMessages = new HashMap<>();
+ IMessageSink sink = new IMessageSink()
+ {
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+ {
+ sentMessages.put(to, message);
+ return false;
+ }
+
+ public boolean allowIncomingMessage(MessageIn message, int id)
+ {
+ return false;
+ }
+ };
+ MessagingService.instance().addMessageSink(sink);
+ }
+
+ @Before
+ public void setup() throws Throwable
+ {
+ DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
+ System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0");
+ // Create a table and insert some data. The actual rows read in the test will be synthetic
+ // but this creates an sstable on disk to be snapshotted.
+ createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))");
+ for (int i = 0; i < 10; i++)
+ execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i);
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+
+ metadata = getCurrentColumnFamilyStore().metadata;
+ cfs = getCurrentColumnFamilyStore();
+ sentMessages.clear();
+ }
+
+ @Test
+ public void noDuplicates()
+ {
+ // no duplicates
+ iterate(iter(metadata,
+ false,
+ makeRow(metadata, 0, 0),
+ makeRow(metadata, 0, 1),
+ makeRow(metadata, 0, 2)));
+ assertCommandIssued(sentMessages, false);
+ }
+
+ @Test
+ public void singleDuplicateForward()
+ {
+
+ iterate(iter(metadata,
+ false,
+ makeRow(metadata, 0, 0),
+ makeRow(metadata, 0, 1),
+ makeRow(metadata, 0, 1)));
+ assertCommandIssued(sentMessages, true);
+ }
+
+ @Test
+ public void singleDuplicateReverse()
+ {
+ iterate(iter(metadata,
+ true,
+ makeRow(metadata, 0, 0),
+ makeRow(metadata, 0, 1),
+ makeRow(metadata, 0, 1)));
+ assertCommandIssued(sentMessages, true);
+ }
+
+ @Test
+ public void multipleContiguousForward()
+ {
+ iterate(iter(metadata,
+ false,
+ makeRow(metadata, 0, 1),
+ makeRow(metadata, 0, 1),
+ makeRow(metadata, 0, 1)));
+ assertCommandIssued(sentMessages, true);
+ }
+
+ @Test
+ public void multipleContiguousReverse()
+ {
+ iterate(iter(metadata,
+ true,
+ makeRow(metadata, 0, 1),
+ makeRow(metadata, 0, 1),
+ makeRow(metadata, 0, 1)));
+ assertCommandIssued(sentMessages, true);
+ }
+
+ @Test
+ public void multipleDisjointForward()
+ {
+ iterate(iter(metadata,
+ false,
+ makeRow(metadata, 0, 0),
+ makeRow(metadata, 0, 0),
+ makeRow(metadata, 0, 1),
+ makeRow(metadata, 0, 2),
+ makeRow(metadata, 0, 2)));
+ assertCommandIssued(sentMessages, true);
+ }
+
+ @Test
+ public void multipleDisjointReverse()
+ {
+ iterate(iter(metadata,
+ true,
+ makeRow(metadata, 0, 0),
+ makeRow(metadata, 0, 0),
+ makeRow(metadata, 0, 1),
+ makeRow(metadata, 0, 2),
+ makeRow(metadata, 0, 2)));
+ assertCommandIssued(sentMessages, true);
+ }
+
+ public static void assertCommandIssued(HashMap<InetAddress, MessageOut> sent, boolean isExpected)
+ {
+ assertEquals(isExpected, !sent.isEmpty());
+ if (isExpected)
+ {
+ assertEquals(1, sent.size());
+ assertTrue(sent.containsKey(FBUtilities.getBroadcastAddress()));
+ SnapshotCommand command = (SnapshotCommand) sent.get(FBUtilities.getBroadcastAddress()).payload;
+ assertTrue(command.snapshot_name.startsWith(DiagnosticSnapshotService.DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX));
+ }
+ }
+
+ private void iterate(UnfilteredPartitionIterator iter)
+ {
+ try (PartitionIterator partitions = applyChecker(iter))
+ {
+ while (partitions.hasNext())
+ {
+ try (RowIterator partition = partitions.next())
+ {
+ partition.forEachRemaining(u -> {});
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+ {
+ return ((AbstractType<T>) type).decompose(value);
+ }
+
+ public static Row makeRow(CFMetaData metadata, Object... clusteringValues)
+ {
+ ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+ for (int i = 0; i < clusteringValues.length; i++)
+ clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+
+ return BTreeRow.noCellLiveRow(new Clustering(clusteringByteBuffers), LivenessInfo.create(metadata, 0, 0));
+ }
+
+ private static PartitionIterator applyChecker(UnfilteredPartitionIterator unfiltered)
+ {
+ int nowInSecs = 0;
+ return DuplicateRowChecker.duringRead(FilteredPartitions.filter(unfiltered, nowInSecs),
+ Collections.singletonList(FBUtilities.getBroadcastAddress()));
+ }
+
+ public static UnfilteredPartitionIterator iter(CFMetaData metadata, boolean isReversedOrder, Unfiltered... unfiltereds)
+ {
+ DecoratedKey key = metadata.partitioner.decorateKey(bytes("key"));
+ Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+
+ UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(metadata,
+ key,
+ DeletionTime.LIVE,
+ metadata.partitionColumns(),
+ Rows.EMPTY_STATIC_ROW,
+ isReversedOrder,
+ EncodingStats.NO_STATS)
+ {
+ protected Unfiltered computeNext()
+ {
+ return iterator.hasNext() ? iterator.next() : endOfData();
+ }
+ };
+
+ return new SingletonUnfilteredPartitionIterator(rowIter, false);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org