You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/20 07:08:18 UTC

[cassandra] branch cassandra-3.0 updated: Avoid creating duplicate rows during major upgrades

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 4d42c18  Avoid creating duplicate rows during major upgrades
4d42c18 is described below

commit 4d42c189fa82b32fd93ae42a164b91e4db62992e
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon May 4 09:35:54 2020 +0200

    Avoid creating duplicate rows during major upgrades
    
    Patch by Aleksey Yeschenko, Sam Tunnicliffe and Marcus Eriksson;
    reviewed by Sylvain Lebresne and Alex Petrov for CASSANDRA-15789
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |  19 ++
 .../cassandra/config/DatabaseDescriptor.java       |  31 +++
 src/java/org/apache/cassandra/db/LegacyLayout.java |  18 +-
 .../db/compaction/CompactionIterator.java          |   5 +-
 .../db/partitions/AbstractBTreePartition.java      |   5 +-
 .../db/partitions/ImmutableBTreePartition.java     |   2 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |  25 ++-
 .../db/transform/DuplicateRowChecker.java          | 139 ++++++++++++
 .../org/apache/cassandra/service/ReadCallback.java |   7 +-
 .../cassandra/service/SnapshotVerbHandler.java     |   5 +
 .../org/apache/cassandra/service/StorageProxy.java |  54 +++++
 .../cassandra/service/StorageProxyMBean.java       |  13 ++
 .../cassandra/utils/DiagnosticSnapshotService.java | 188 ++++++++++++++++
 .../cassandra/distributed/impl/Instance.java       |   4 +-
 .../upgrade/MixedModeReadRepairTest.java           |  85 ++++++++
 .../distributed/upgrade/UpgradeTestBase.java       |   3 +-
 .../org/apache/cassandra/db/LegacyLayoutTest.java  |  39 +++-
 .../db/compaction/CompactionIteratorTest.java      | 134 ++++++++++++
 .../db/partition/PartitionUpdateTest.java          | 144 +++++++++++++
 .../db/transform/DuplicateRowCheckerTest.java      | 240 +++++++++++++++++++++
 21 files changed, 1145 insertions(+), 16 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0a0a4d5..b875ae1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.21
+ * Avoid creating duplicate rows during major upgrades (CASSANDRA-15789)
  * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
  * Fix Debian init start/stop (CASSANDRA-15770)
  * Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index bc3e3bf..6003bd1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -361,6 +361,25 @@ public class Config
         outboundBindAny = value;
     }
 
+    /**
+     * If true, when rows with duplicate clustering keys are detected during a read or compaction
+     * a snapshot will be taken. In the read case, each a snapshot request will be issued to each
+     * replica involved in the query, for compaction the snapshot will be created locally.
+     * These are limited at the replica level so that only a single snapshot per-day can be taken
+     * via this method.
+     *
+     * This requires check_for_duplicate_rows_during_reads and/or check_for_duplicate_rows_during_compaction
+     * below to be enabled
+     */
+    public volatile boolean snapshot_on_duplicate_row_detection = false;
+
+    /**
+     * If these are enabled duplicate keys will get logged, and if snapshot_on_duplicate_row_detection
+     * is enabled, the table will get snapshotted for offline investigation
+     */
+    public volatile boolean check_for_duplicate_rows_during_reads = true;
+    public volatile boolean check_for_duplicate_rows_during_compaction = true;
+
     public static boolean isClientMode()
     {
         return isClientMode;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a161a2a..4b732c2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2164,4 +2164,35 @@ public class DatabaseDescriptor
     {
         return strictRuntimeChecks;
     }
+
+    public static boolean snapshotOnDuplicateRowDetection()
+    {
+        return conf.snapshot_on_duplicate_row_detection;
+    }
+
+    public static void setSnapshotOnDuplicateRowDetection(boolean enabled)
+    {
+        conf.snapshot_on_duplicate_row_detection = enabled;
+    }
+
+    public static boolean checkForDuplicateRowsDuringReads()
+    {
+        return conf.check_for_duplicate_rows_during_reads;
+    }
+
+    public static void setCheckForDuplicateRowsDuringReads(boolean enabled)
+    {
+        conf.check_for_duplicate_rows_during_reads = enabled;
+    }
+
+    public static boolean checkForDuplicateRowsDuringCompaction()
+    {
+        return conf.check_for_duplicate_rows_during_compaction;
+    }
+
+    public static void setCheckForDuplicateRowsDuringCompaction(boolean enabled)
+    {
+        conf.check_for_duplicate_rows_during_compaction = enabled;
+    }
+
 }
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 42d50a1..37cc935 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -29,6 +29,8 @@ import java.util.stream.Collectors;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.SuperColumnCompatibility;
 import org.apache.cassandra.utils.AbstractIterator;
+
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.PeekingIterator;
@@ -1485,10 +1487,12 @@ public abstract class LegacyLayout
                 // written, as 2.x storage format does not guarantee just one range tombstone, unlike 3.x.
                 // We have to make sure that clustering matches, which would mean that tombstone is for the
                 // same row.
-                if (rowDeletion != null && clustering.equals(tombstone.start.getAsClustering(metadata)))
+                if (clustering.equals(tombstone.start.getAsClustering(metadata)))
                 {
-                    // If the tombstone superceeds the previous delete, we discard the previous one
-                    if (tombstone.deletionTime.supersedes(rowDeletion.deletionTime))
+                    // If the tombstone superceeds the previous delete, we discard the previous one.
+                    // This assumes that we are building the row from a sane source (ie, this row deletion
+                    // does not delete anything already added to the builder). See CASSANDRA-15789 for details
+                    if (rowDeletion == null || tombstone.deletionTime.supersedes(rowDeletion.deletionTime))
                     {
                         builder.addRowDeletion(Row.Deletion.regular(tombstone.deletionTime));
                         rowDeletion = tombstone;
@@ -1497,7 +1501,7 @@ public abstract class LegacyLayout
                     return true;
                 }
 
-                // If we're already within a row and there was no delete written before that one, it can't be the same one
+                // different clustering -> new row
                 return false;
             }
 
@@ -1620,7 +1624,8 @@ public abstract class LegacyLayout
         public final ColumnDefinition column;
         public final ByteBuffer collectionElement;
 
-        private LegacyCellName(Clustering clustering, ColumnDefinition column, ByteBuffer collectionElement)
+        @VisibleForTesting
+        public LegacyCellName(Clustering clustering, ColumnDefinition column, ByteBuffer collectionElement)
         {
             this.clustering = clustering;
             this.column = column;
@@ -1740,7 +1745,8 @@ public abstract class LegacyLayout
         public final int localDeletionTime;
         public final int ttl;
 
-        private LegacyCell(Kind kind, LegacyCellName name, ByteBuffer value, long timestamp, int localDeletionTime, int ttl)
+        @VisibleForTesting
+        public LegacyCell(Kind kind, LegacyCellName name, ByteBuffer value, long timestamp, int localDeletionTime, int ttl)
         {
             this.kind = kind;
             this.name = name;
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 39cb2df..b132d90 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -25,6 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+
+import org.apache.cassandra.db.transform.DuplicateRowChecker;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PurgeFunction;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -104,7 +106,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
                                              ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
                                              : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
         boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug
-        this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
+        merged = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
+        this.compacted = DuplicateRowChecker.duringCompaction(merged, type);
     }
 
     public boolean isForThrift()
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 12dbb39..2cd9e97 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -320,10 +320,10 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
 
     protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity)
     {
-        return build(iterator, initialRowCapacity, true);
+        return build(iterator, initialRowCapacity, true, null);
     }
 
-    protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered)
+    protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver)
     {
         CFMetaData metadata = iterator.metadata();
         PartitionColumns columns = iterator.columns();
@@ -331,6 +331,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
 
         BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
         builder.auto(!ordered);
+        builder.setQuickResolver(quickResolver);
         MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed);
 
         while (iterator.hasNext())
diff --git a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
index 8d96f1e..8db5ee4 100644
--- a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
@@ -108,7 +108,7 @@ public class ImmutableBTreePartition extends AbstractBTreePartition
      */
     public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered)
     {
-        return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), build(iterator, initialRowCapacity, ordered));
+        return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), build(iterator, initialRowCapacity, ordered, null));
     }
 
     protected Holder holder()
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f476f5b..3560e90 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -36,6 +37,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 
@@ -214,7 +216,26 @@ public class PartitionUpdate extends AbstractBTreePartition
      */
     public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator)
     {
-        Holder holder = build(iterator, 16);
+        return fromIterator(iterator, true,  null);
+    }
+
+    private static final NoSpamLogger rowMergingLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+    /**
+     * Removes duplicate rows from incoming iterator, to be used when we can't trust the underlying iterator (like when reading legacy sstables)
+     */
+    public static PartitionUpdate fromPre30Iterator(UnfilteredRowIterator iterator)
+    {
+        return fromIterator(iterator, false, (a, b) -> {
+            CFMetaData cfm = iterator.metadata();
+            rowMergingLogger.warn(String.format("Merging rows from pre 3.0 iterator for partition key: %s",
+                                                cfm.getKeyValidator().getString(iterator.partitionKey().getKey())));
+            return Rows.merge(a, b, FBUtilities.nowInSeconds());
+        });
+    }
+
+    private static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver)
+    {
+        Holder holder = build(iterator, 16, ordered, quickResolver);
         MutableDeletionInfo deletionInfo = (MutableDeletionInfo) holder.deletionInfo;
         return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false);
     }
@@ -746,7 +767,7 @@ public class PartitionUpdate extends AbstractBTreePartition
             try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key))
             {
                 assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families
-                return PartitionUpdate.fromIterator(iterator);
+                return PartitionUpdate.fromPre30Iterator(iterator);
             }
         }
 
diff --git a/src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java b/src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java
new file mode 100644
index 0000000..7a6f7f9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.transform;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.DiagnosticSnapshotService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DuplicateRowChecker extends Transformation<BaseRowIterator<?>>
+{
+    private static final Logger logger = LoggerFactory.getLogger(DuplicateRowChecker.class);
+
+    Clustering previous = null;
+    int duplicatesDetected = 0;
+
+    final String stage;
+    final List<InetAddress> replicas;
+    final CFMetaData metadata;
+    final DecoratedKey key;
+    final boolean snapshotOnDuplicate;
+
+    DuplicateRowChecker(final DecoratedKey key,
+                        final CFMetaData metadata,
+                        final String stage,
+                        final boolean snapshotOnDuplicate,
+                        final List<InetAddress> replicas)
+    {
+        this.key = key;
+        this.metadata = metadata;
+        this.stage = stage;
+        this.snapshotOnDuplicate = snapshotOnDuplicate;
+        this.replicas = replicas;
+    }
+
+    protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+    {
+        return deletionTime;
+    }
+
+    protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+    {
+        return marker;
+    }
+
+    protected Row applyToStatic(Row row)
+    {
+        return row;
+    }
+
+    protected Row applyToRow(Row row)
+    {
+        if (null != previous && row.clustering().equals(previous))
+            duplicatesDetected++;
+        previous = row.clustering();
+        return row;
+    }
+
+    protected void onPartitionClose()
+    {
+        if (duplicatesDetected > 0)
+        {
+            logger.warn("Detected {} duplicate rows for {} during {}",
+                        duplicatesDetected,
+                        metadata.getKeyValidator().getString(key.getKey()),
+                        stage);
+            if (snapshotOnDuplicate)
+                DiagnosticSnapshotService.duplicateRows(metadata, replicas);
+        }
+        duplicatesDetected = 0;
+        previous = null;
+        super.onPartitionClose();
+    }
+
+    public static UnfilteredPartitionIterator duringCompaction(final UnfilteredPartitionIterator iterator, OperationType type)
+    {
+        if (!DatabaseDescriptor.checkForDuplicateRowsDuringCompaction())
+            return iterator;
+        final List<InetAddress> address = Collections.singletonList(FBUtilities.getBroadcastAddress());
+        final boolean snapshot = DatabaseDescriptor.snapshotOnDuplicateRowDetection();
+        return Transformation.apply(iterator, new Transformation<UnfilteredRowIterator>()
+        {
+            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+            {
+                return Transformation.apply(partition, new DuplicateRowChecker(partition.partitionKey(),
+                                                                               partition.metadata(),
+                                                                               type.toString(),
+                                                                               snapshot,
+                                                                               address));
+            }
+        });
+    }
+
+    public static PartitionIterator duringRead(final PartitionIterator iterator, final List<InetAddress> replicas)
+    {
+        if (!DatabaseDescriptor.checkForDuplicateRowsDuringReads())
+            return iterator;
+        final boolean snapshot = DatabaseDescriptor.snapshotOnDuplicateRowDetection();
+        return Transformation.apply(iterator, new Transformation<RowIterator>()
+        {
+            protected RowIterator applyToPartition(RowIterator partition)
+            {
+                return Transformation.apply(partition, new DuplicateRowChecker(partition.partitionKey(),
+                                                                               partition.metadata(),
+                                                                               "Read",
+                                                                               snapshot,
+                                                                               replicas));
+            }
+        });
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 516384a..71eb0bc 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -32,6 +32,10 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.DuplicateRowChecker;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
@@ -132,6 +136,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
             : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
     }
 
+
     public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
     {
         awaitResults();
@@ -139,7 +144,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
         PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve();
         if (logger.isTraceEnabled())
             logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-        return result;
+        return DuplicateRowChecker.duringRead(result, endpoints);
     }
 
     public int blockFor()
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index a997533..179abeb 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.DiagnosticSnapshotService;
 
 public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
 {
@@ -38,6 +39,10 @@ public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
         {
             Keyspace.clearSnapshot(command.snapshot_name, command.keyspace);
         }
+        else if (DiagnosticSnapshotService.isDiagnosticSnapshotRequest(command))
+        {
+            DiagnosticSnapshotService.snapshot(command, message.from);
+        }
         else
             Keyspace.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
         logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from);
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 31898e6..19cd901 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2685,4 +2685,58 @@ public class StorageProxy implements StorageProxyMBean
     public void setOtcBacklogExpirationInterval(int intervalInMillis) {
         DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
     }
+
+    @Override
+    public boolean getSnapshotOnDuplicateRowDetectionEnabled()
+    {
+        return DatabaseDescriptor.snapshotOnDuplicateRowDetection();
+    }
+
+    @Override
+    public void enableSnapshotOnDuplicateRowDetection()
+    {
+        DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
+    }
+
+    @Override
+    public void disableSnapshotOnDuplicateRowDetection()
+    {
+        DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(false);
+    }
+
+    @Override
+    public boolean getCheckForDuplicateRowsDuringReads()
+    {
+        return DatabaseDescriptor.checkForDuplicateRowsDuringReads();
+    }
+
+    @Override
+    public void enableCheckForDuplicateRowsDuringReads()
+    {
+        DatabaseDescriptor.setCheckForDuplicateRowsDuringReads(true);
+    }
+
+    @Override
+    public void disableCheckForDuplicateRowsDuringReads()
+    {
+        DatabaseDescriptor.setCheckForDuplicateRowsDuringReads(false);
+    }
+
+    @Override
+    public boolean getCheckForDuplicateRowsDuringCompaction()
+    {
+        return DatabaseDescriptor.checkForDuplicateRowsDuringCompaction();
+    }
+
+    @Override
+    public void enableCheckForDuplicateRowsDuringCompaction()
+    {
+        DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(true);
+    }
+
+    @Override
+    public void disableCheckForDuplicateRowsDuringCompaction()
+    {
+        DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(false);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index ee82a5b..047934c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -21,6 +21,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+
 public interface StorageProxyMBean
 {
     public long getTotalHints();
@@ -64,4 +66,15 @@ public interface StorageProxyMBean
 
     /** Returns each live node's schema version */
     public Map<String, List<String>> getSchemaVersions();
+
+    void enableSnapshotOnDuplicateRowDetection();
+    void disableSnapshotOnDuplicateRowDetection();
+    boolean getSnapshotOnDuplicateRowDetectionEnabled();
+
+    boolean getCheckForDuplicateRowsDuringReads();
+    void enableCheckForDuplicateRowsDuringReads();
+    void disableCheckForDuplicateRowsDuringReads();
+    boolean getCheckForDuplicateRowsDuringCompaction();
+    void enableCheckForDuplicateRowsDuringCompaction();
+    void disableCheckForDuplicateRowsDuringCompaction();
 }
diff --git a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
new file mode 100644
index 0000000..5c48412
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.net.InetAddress;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Provides a means to take snapshots when triggered by anomalous events or when the breaking of invariants is
+ * detected. When debugging certain classes of problems, having access to the relevant set of sstables when the problem
+ * is detected (or as close to then as possible) can be invaluable.
+ *
+ * This class performs two functions; on a replica where an anomaly is detected, it provides methods to issue snapshot
+ * requests to a provided set of replicas. For instance, if rows with duplicate clusterings are detected
+ * (CASSANDRA-15789) during a read, a snapshot request will be issued to all participating replicas. If detected during
+ * compaction, only the replica itself will receive the request. Requests are issued at a maximum rate of 1 per minute
+ * for any given table. Any additional triggers for the same table during the 60 second window are dropped, regardless
+ * of the replica set. This window is configurable via a system property (cassandra.diagnostic_snapshot_interval_nanos),
+ * but this is intended for use in testing only and operators are not expected to override the default.
+ *
+ * The second function performed is to handle snapshot requests on replicas. Snapshot names are prefixed with strings
+ * specific to the reason which triggered them. To manage consumption of disk space, replicas are restricted to taking
+ * a single snapshot for each prefix in a single calendar day. So if duplicate rows are detected by multiple
+ * coordinators during reads with the same replica set (or overlapping sets) on the same table, the coordinators may
+ * each issue snapshot  requests, but the replicas will only accept the first one they receive. Further requests will
+ * be dropped on the replica side.
+ */
+public class DiagnosticSnapshotService
+{
+    private static final Logger logger = LoggerFactory.getLogger(DiagnosticSnapshotService.class);
+
+    public static final DiagnosticSnapshotService instance =
+        new DiagnosticSnapshotService(Executors.newSingleThreadExecutor(new NamedThreadFactory("DiagnosticSnapshot")));
+
+    public static final String DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX = "DuplicateRows-";
+
+    private final Executor executor;
+
+    private DiagnosticSnapshotService(Executor executor)
+    {
+        this.executor = executor;
+    }
+
+    // Issue at most 1 snapshot request per minute for any given table.
+    // Replicas will only create one snapshot per day, but this stops us
+    // from swamping the network.
+    // Overridable via system property for testing.
+    private static final long SNAPSHOT_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1);
+    private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.BASIC_ISO_DATE;
+    private final ConcurrentHashMap<UUID, AtomicLong> lastSnapshotTimes = new ConcurrentHashMap<>();
+
+    public static void duplicateRows(CFMetaData metadata, Iterable<InetAddress> replicas)
+    {
+        instance.maybeTriggerSnapshot(metadata, DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX, replicas);
+    }
+
+    public static boolean isDiagnosticSnapshotRequest(SnapshotCommand command)
+    {
+        return command.snapshot_name.startsWith(DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX);
+    }
+
+    public static void snapshot(SnapshotCommand command, InetAddress initiator)
+    {
+        Preconditions.checkArgument(isDiagnosticSnapshotRequest(command));
+        instance.maybeSnapshot(command, initiator);
+    }
+
+    public static String getSnapshotName(String prefix)
+    {
+        return String.format("%s%s", prefix, DATE_FORMAT.format(LocalDate.now()));
+    }
+
+    @VisibleForTesting
+    public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+    {
+        ExecutorUtils.shutdownNowAndWait(timeout, unit, executor);
+    }
+
+    private void maybeTriggerSnapshot(CFMetaData metadata, String prefix, Iterable<InetAddress> endpoints)
+    {
+        long now = System.nanoTime();
+        AtomicLong cached = lastSnapshotTimes.computeIfAbsent(metadata.cfId, u -> new AtomicLong(0));
+        long last = cached.get();
+        long interval = Long.getLong("cassandra.diagnostic_snapshot_interval_nanos", SNAPSHOT_INTERVAL_NANOS);
+        if (now - last > interval && cached.compareAndSet(last, now))
+        {
+            MessageOut<?> msg = new SnapshotCommand(metadata.ksName,
+                                                    metadata.cfName,
+                                                    getSnapshotName(prefix),
+                                                    false).createMessage();
+            for (InetAddress replica : endpoints)
+                MessagingService.instance().sendOneWay(msg, replica);
+        }
+        else
+        {
+            logger.debug("Diagnostic snapshot request dropped due to throttling");
+        }
+    }
+
+    private void maybeSnapshot(SnapshotCommand command, InetAddress initiator)
+    {
+        executor.execute(new DiagnosticSnapshotTask(command, initiator));
+    }
+
+    private static class DiagnosticSnapshotTask implements Runnable
+    {
+        final SnapshotCommand command;
+        final InetAddress from;
+
+        DiagnosticSnapshotTask(SnapshotCommand command, InetAddress from)
+        {
+            this.command = command;
+            this.from = from;
+        }
+
+        public void run()
+        {
+            try
+            {
+                Keyspace ks = Keyspace.open(command.keyspace);
+                if (ks == null)
+                {
+                    logger.info("Snapshot request received from {} for {}.{} but keyspace not found",
+                                from,
+                                command.keyspace,
+                                command.column_family);
+                    return;
+                }
+
+                ColumnFamilyStore cfs = ks.getColumnFamilyStore(command.column_family);
+                if (cfs.snapshotExists(command.snapshot_name))
+                {
+                    logger.info("Received diagnostic snapshot request from {} for {}.{}, " +
+                                "but snapshot with tag {} already exists",
+                                from,
+                                command.keyspace,
+                                command.column_family,
+                                command.snapshot_name);
+                    return;
+                }
+                logger.info("Creating snapshot requested by {} of {}.{} tag: {}",
+                            from,
+                            command.keyspace,
+                            command.column_family,
+                            command.snapshot_name);
+                cfs.snapshot(command.snapshot_name);
+            }
+            catch (IllegalArgumentException e)
+            {
+                logger.warn("Snapshot request received from {} for {}.{} but CFS not found",
+                            from,
+                            command.keyspace,
+                            command.column_family);
+            }
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index ed14404..d23eec0 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -99,6 +99,7 @@ import org.apache.cassandra.tools.NodeTool;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.DiagnosticSnapshotService;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
@@ -700,7 +701,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                                 () -> StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES),
                                 () -> Ref.shutdownReferenceReaper(1L, MINUTES),
                                 () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
-                                () -> SSTableReader.shutdownBlocking(1L, MINUTES)
+                                () -> SSTableReader.shutdownBlocking(1L, MINUTES),
+                                () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES)
             );
             error = parallelRun(error, executor,
                                 () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index cc50053..e9391e0 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@ -18,12 +18,19 @@
 
 package org.apache.cassandra.distributed.upgrade;
 
+import java.util.Arrays;
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
 import org.junit.Test;
 
+import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.shared.DistributedTestBase;
 import org.apache.cassandra.distributed.shared.Versions;
 
+import static org.junit.Assert.fail;
+
 public class MixedModeReadRepairTest extends UpgradeTestBase
 {
     @Test
@@ -49,4 +56,82 @@ public class MixedModeReadRepairTest extends UpgradeTestBase
         .runAfterClusterUpgrade((cluster) -> cluster.get(2).forceCompact(DistributedTestBase.KEYSPACE, "tbl"))
         .run();
     }
+
+    @Test
+    public void mixedModeReadRepairDuplicateRows() throws Throwable
+    {
+        final String[] workload1 = new String[]
+        {
+            "DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 1 AND ck = 2;",
+            "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, {'a':'b'}) USING TIMESTAMP 3;",
+            "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, {'c':'d'}) USING TIMESTAMP 3;",
+            "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, {'e':'f'}) USING TIMESTAMP 3;",
+        };
+
+        final String[] workload2 = new String[]
+        {
+            "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, {'g':'h'}) USING TIMESTAMP 5;",
+        };
+
+        new TestCase()
+        .nodes(2)
+        .upgrade(Versions.Major.v22, Versions.Major.v30)
+        .setup((cluster) ->
+        {
+            cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck));");
+        })
+        .runAfterNodeUpgrade((cluster, node) ->
+        {
+            if (node == 2)
+                return;
+
+            // now node1 is 3.0 and node2 is 2.2
+            for (int i = 0; i < workload1.length; i++ )
+                cluster.coordinator(2).execute(workload1[i], ConsistencyLevel.QUORUM);
+
+            cluster.get(1).flush(KEYSPACE);
+            cluster.get(2).flush(KEYSPACE);
+
+            validate(cluster, 2, false);
+
+            for (int i = 0; i < workload2.length; i++ )
+                cluster.coordinator(2).execute(workload2[i], ConsistencyLevel.QUORUM);
+
+            cluster.get(1).flush(KEYSPACE);
+            cluster.get(2).flush(KEYSPACE);
+
+            validate(cluster, 1, true);
+        })
+        .run();
+    }
+
+    private void validate(UpgradeableCluster cluster, int nodeid, boolean local)
+    {
+        String query = "SELECT * FROM " + KEYSPACE + ".tbl";
+
+        Iterator<Object[]> iter = local
+                                ? Iterators.forArray(cluster.get(nodeid).executeInternal(query))
+                                : cluster.coordinator(nodeid).executeWithPaging(query, ConsistencyLevel.ALL, 2);
+
+        Object[] prevRow = null;
+        Object prevClustering = null;
+
+        while (iter.hasNext())
+        {
+            Object[] row = iter.next();
+            Object clustering = row[1];
+
+            if (clustering.equals(prevClustering))
+            {
+                fail(String.format("Duplicate rows on node %d in %s mode: \n%s\n%s",
+                                   nodeid,
+                                   local ? "local" : "distributed",
+                                   Arrays.toString(prevRow),
+                                   Arrays.toString(row)));
+            }
+
+            prevRow = row;
+            prevClustering = clustering;
+        }
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 3567453..8de3e4c 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.distributed.impl.Instance;
 import org.apache.cassandra.distributed.impl.InstanceConfig;
 import org.apache.cassandra.distributed.shared.Builder;
@@ -174,7 +175,7 @@ public class UpgradeTestBase extends DistributedTestBase
 
                     for (Version version : upgrade.upgrade)
                     {
-                        for (int n : nodesToUpgrade)
+                        for (int n=1; n<=nodesToUpgrade.size(); n++)
                         {
                             cluster.get(n).shutdown().get();
                             cluster.get(n).setVersion(version);
diff --git a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
index 7f3b424..0bb2459 100644
--- a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
+++ b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
@@ -25,6 +25,8 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
 import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.Cell;
@@ -370,4 +372,39 @@ public class LegacyLayoutTest
         LegacyLayout.fromUnfilteredRowIterator(null, p.unfilteredIterator());
         LegacyLayout.serializedSizeAsLegacyPartition(null, p.unfilteredIterator(), VERSION_21);
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testCellGrouper()
+    {
+        // CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))
+        CFMetaData cfm = CFMetaData.Builder.create("ks", "table")
+                                           .addPartitionKey("pk", Int32Type.instance)
+                                           .addClusteringColumn("ck", Int32Type.instance)
+                                           .addRegularColumn("v", MapType.getInstance(UTF8Type.instance, UTF8Type.instance, true))
+                                           .build();
+        SerializationHelper helper = new SerializationHelper(cfm, MessagingService.VERSION_22, SerializationHelper.Flag.LOCAL, ColumnFilter.all(cfm));
+        LegacyLayout.CellGrouper cg = new LegacyLayout.CellGrouper(cfm, helper);
+
+        Slice.Bound startBound = Slice.Bound.create(ClusteringPrefix.Kind.INCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
+        Slice.Bound endBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
+        LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(startBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+        LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(endBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+        LegacyLayout.LegacyRangeTombstone lrt = new LegacyLayout.LegacyRangeTombstone(start, end, new DeletionTime(2, 1588598040));
+        assertTrue(cg.addAtom(lrt));
+
+        // add a real cell
+        LegacyLayout.LegacyCell cell = new LegacyLayout.LegacyCell(LegacyLayout.LegacyCell.Kind.REGULAR,
+                                                                   new LegacyLayout.LegacyCellName(new Clustering(ByteBufferUtil.bytes(2)),
+                                                                                                   cfm.getColumnDefinition(ByteBufferUtil.bytes("v")),
+                                                                                                   ByteBufferUtil.bytes("g")),
+                                                                   ByteBufferUtil.bytes("v"), 3, Integer.MAX_VALUE, 0);
+        assertTrue(cg.addAtom(cell));
+
+        // add legacy range tombstone where collection name is null for the end bound (this gets translated to a row tombstone)
+        startBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
+        endBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
+        start = new LegacyLayout.LegacyBound(startBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+        end = new LegacyLayout.LegacyBound(endBound, false, null);
+        assertTrue(cg.addAtom(new LegacyLayout.LegacyRangeTombstone(start, end, new DeletionTime(1, 1588598040))));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
new file mode 100644
index 0000000..549a94d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.assertCommandIssued;
+import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.iter;
+import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.makeRow;
+import static org.junit.Assert.*;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CompactionIteratorTest extends CQLTester
+{
+    @Test
+    public void duplicateRowsTest() throws Throwable
+    {
+        System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0");
+        // Create a table and insert some data. The actual rows read in the test will be synthetic
+        // but this creates an sstable on disk to be snapshotted.
+        createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))");
+        for (int i = 0; i < 10; i++)
+            execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i);
+        flush();
+
+        DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        CFMetaData metadata = getCurrentColumnFamilyStore().metadata;
+
+        final HashMap<InetAddress, MessageOut> sentMessages = new HashMap<>();
+        IMessageSink sink = new IMessageSink()
+        {
+            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+            {
+                sentMessages.put(to, message);
+                return false;
+            }
+
+            public boolean allowIncomingMessage(MessageIn message, int id)
+            {
+                return false;
+            }
+        };
+        MessagingService.instance().addMessageSink(sink);
+
+        // no duplicates
+        sentMessages.clear();
+        iterate(cfs, iter(metadata,
+                          false,
+                          makeRow(metadata,0, 0),
+                          makeRow(metadata,0, 1),
+                          makeRow(metadata,0, 2)));
+        assertCommandIssued(sentMessages, false);
+
+        // now test with a duplicate row and see that we issue a snapshot command
+        sentMessages.clear();
+        iterate(cfs, iter(metadata,
+                          false,
+                          makeRow(metadata, 0, 0),
+                          makeRow(metadata, 0, 1),
+                          makeRow(metadata, 0, 1)));
+        assertCommandIssued(sentMessages, true);
+    }
+
+    private void iterate(ColumnFamilyStore cfs, UnfilteredPartitionIterator partitions)
+    {
+
+        try (CompactionController controller = new CompactionController(getCurrentColumnFamilyStore(), Integer.MAX_VALUE);
+             ISSTableScanner scanner = scanner(cfs, partitions);
+             CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
+                                                              Collections.singletonList(scanner),
+                                                              controller, FBUtilities.nowInSeconds(), null))
+        {
+            while (iter.hasNext())
+            {
+                try (UnfilteredRowIterator partition = iter.next())
+                {
+                    partition.forEachRemaining(u -> {});
+                }
+            }
+        }
+    }
+
+    private ISSTableScanner scanner(final ColumnFamilyStore cfs, final UnfilteredPartitionIterator partitions)
+    {
+
+        return new ISSTableScanner()
+        {
+            public long getLengthInBytes() { return 0; }
+
+            public long getCurrentPosition() { return 0; }
+
+            public String getBackingFiles() { return cfs.getLiveSSTables().iterator().next().toString(); }
+
+            public boolean isForThrift() { return false; }
+
+            public CFMetaData metadata() { return cfs.metadata; }
+
+            public void close() { }
+
+            public boolean hasNext() { return partitions.hasNext(); }
+
+            public UnfilteredRowIterator next() { return partitions.next(); }
+        };
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
index 7dff91f..2bd685c 100644
--- a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
+++ b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
@@ -17,15 +17,42 @@
  */
 package org.apache.cassandra.db.partition;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowAndDeletionMergeIterator;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.junit.Test;
 
 import junit.framework.Assert;
 
+import static org.junit.Assert.assertEquals;
+
+
 public class PartitionUpdateTest extends CQLTester
 {
     @Test
@@ -85,4 +112,121 @@ public class PartitionUpdateTest extends CQLTester
         update = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), "key0").buildUpdate();
         Assert.assertEquals(0, update.operationCount());
     }
+
+    /**
+     * Makes sure we merge duplicate rows, see CASSANDRA-15789
+     */
+    @Test
+    public void testDuplicate()
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))");
+        CFMetaData cfm = currentTableMetadata();
+
+        DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+
+        List<Row> rows = new ArrayList<>();
+        Row.Builder builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
+        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
+        builder.addComplexDeletion(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), new DeletionTime(2, 1588586647));
+
+        Cell c = BufferCell.live(cfm, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
+        builder.addCell(c);
+
+        Row r = builder.build();
+        rows.add(r);
+
+        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
+        builder.addRowDeletion(new Row.Deletion(new DeletionTime(1588586647, 1), false));
+        r = builder.build();
+        rows.add(r);
+
+        RowAndDeletionMergeIterator rmi = new RowAndDeletionMergeIterator(cfm,
+                                                                          dk,
+                                                                          DeletionTime.LIVE,
+                                                                          ColumnFilter.all(cfm),
+                                                                          Rows.EMPTY_STATIC_ROW,
+                                                                          false,
+                                                                          EncodingStats.NO_STATS,
+                                                                          rows.iterator(),
+                                                                          Collections.emptyIterator(),
+                                                                          true);
+
+        PartitionUpdate pu = PartitionUpdate.fromPre30Iterator(rmi);
+        pu.iterator();
+
+        Mutation m = new Mutation(getCurrentColumnFamilyStore().keyspace.getName(), dk);
+        m.add(pu);
+        m.apply();
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+
+        SSTableReader sst = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next();
+        int count = 0;
+        try (ISSTableScanner scanner = sst.getScanner())
+        {
+            while (scanner.hasNext())
+            {
+                try (UnfilteredRowIterator iter = scanner.next())
+                {
+                    while (iter.hasNext())
+                    {
+                        iter.next();
+                        count++;
+                    }
+                }
+            }
+        }
+        assertEquals(1, count);
+    }
+
+    /**
+     * Makes sure we don't create duplicates when merging 2 partition updates
+     */
+    @Test
+    public void testMerge()
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))");
+        CFMetaData cfm = currentTableMetadata();
+
+        DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+
+        Row.Builder builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
+        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
+        builder.addComplexDeletion(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), new DeletionTime(2, 1588586647));
+        Cell c = BufferCell.live(cfm, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
+        builder.addCell(c);
+        Row r = builder.build();
+
+        PartitionUpdate p1 = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 2);
+        p1.add(r);
+
+        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
+        builder.addRowDeletion(new Row.Deletion(new DeletionTime(1588586647, 1), false));
+        r = builder.build();
+        PartitionUpdate p2 = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 2);
+        p2.add(r);
+
+        Mutation m = new Mutation(getCurrentColumnFamilyStore().keyspace.getName(), dk);
+        m.add(PartitionUpdate.merge(Lists.newArrayList(p1, p2)));
+        m.apply();
+
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+
+        SSTableReader sst = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next();
+        int count = 0;
+        try (ISSTableScanner scanner = sst.getScanner())
+        {
+            while (scanner.hasNext())
+            {
+                try (UnfilteredRowIterator iter = scanner.next())
+                {
+                    while (iter.hasNext())
+                    {
+                        iter.next();
+                        count++;
+                    }
+                }
+            }
+        }
+        assertEquals(1, count);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
new file mode 100644
index 0000000..78a0c8c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.transform;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
+import org.junit.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.DiagnosticSnapshotService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DuplicateRowCheckerTest extends CQLTester
+{
+    ColumnFamilyStore cfs;
+    CFMetaData metadata;
+    static HashMap<InetAddress, MessageOut> sentMessages;
+
+    @BeforeClass
+    public static void setupMessaging()
+    {
+        sentMessages = new HashMap<>();
+        IMessageSink sink = new IMessageSink()
+        {
+            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+            {
+                sentMessages.put(to, message);
+                return false;
+            }
+
+            public boolean allowIncomingMessage(MessageIn message, int id)
+            {
+                return false;
+            }
+        };
+        MessagingService.instance().addMessageSink(sink);
+    }
+
+    @Before
+    public void setup() throws Throwable
+    {
+        DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
+        System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0");
+        // Create a table and insert some data. The actual rows read in the test will be synthetic
+        // but this creates an sstable on disk to be snapshotted.
+        createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))");
+        for (int i = 0; i < 10; i++)
+            execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i);
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+
+        metadata = getCurrentColumnFamilyStore().metadata;
+        cfs = getCurrentColumnFamilyStore();
+        sentMessages.clear();
+    }
+
+    @Test
+    public void noDuplicates()
+    {
+        // no duplicates
+        iterate(iter(metadata,
+                     false,
+                     makeRow(metadata, 0, 0),
+                     makeRow(metadata, 0, 1),
+                     makeRow(metadata, 0, 2)));
+        assertCommandIssued(sentMessages, false);
+    }
+
+    @Test
+    public void singleDuplicateForward()
+    {
+
+        iterate(iter(metadata,
+                     false,
+                     makeRow(metadata, 0, 0),
+                     makeRow(metadata, 0, 1),
+                     makeRow(metadata, 0, 1)));
+        assertCommandIssued(sentMessages, true);
+    }
+
+    @Test
+    public void singleDuplicateReverse()
+    {
+        iterate(iter(metadata,
+                     true,
+                     makeRow(metadata, 0, 0),
+                     makeRow(metadata, 0, 1),
+                     makeRow(metadata, 0, 1)));
+        assertCommandIssued(sentMessages, true);
+    }
+
+    @Test
+    public void multipleContiguousForward()
+    {
+        iterate(iter(metadata,
+                     false,
+                     makeRow(metadata, 0, 1),
+                     makeRow(metadata, 0, 1),
+                     makeRow(metadata, 0, 1)));
+        assertCommandIssued(sentMessages, true);
+    }
+
+    @Test
+    public void multipleContiguousReverse()
+    {
+        iterate(iter(metadata,
+                     true,
+                     makeRow(metadata, 0, 1),
+                     makeRow(metadata, 0, 1),
+                     makeRow(metadata, 0, 1)));
+        assertCommandIssued(sentMessages, true);
+    }
+
+    @Test
+    public void multipleDisjointForward()
+    {
+        iterate(iter(metadata,
+                     false,
+                     makeRow(metadata, 0, 0),
+                     makeRow(metadata, 0, 0),
+                     makeRow(metadata, 0, 1),
+                     makeRow(metadata, 0, 2),
+                     makeRow(metadata, 0, 2)));
+        assertCommandIssued(sentMessages, true);
+    }
+
+    @Test
+    public void multipleDisjointReverse()
+    {
+        iterate(iter(metadata,
+                     true,
+                     makeRow(metadata, 0, 0),
+                     makeRow(metadata, 0, 0),
+                     makeRow(metadata, 0, 1),
+                     makeRow(metadata, 0, 2),
+                     makeRow(metadata, 0, 2)));
+        assertCommandIssued(sentMessages, true);
+    }
+
+    public static void assertCommandIssued(HashMap<InetAddress, MessageOut> sent, boolean isExpected)
+    {
+        assertEquals(isExpected, !sent.isEmpty());
+        if (isExpected)
+        {
+            assertEquals(1, sent.size());
+            assertTrue(sent.containsKey(FBUtilities.getBroadcastAddress()));
+            SnapshotCommand command = (SnapshotCommand) sent.get(FBUtilities.getBroadcastAddress()).payload;
+            assertTrue(command.snapshot_name.startsWith(DiagnosticSnapshotService.DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX));
+        }
+    }
+
+    private void iterate(UnfilteredPartitionIterator iter)
+    {
+        try (PartitionIterator partitions = applyChecker(iter))
+        {
+            while (partitions.hasNext())
+            {
+                try (RowIterator partition = partitions.next())
+                {
+                    partition.forEachRemaining(u -> {});
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+    {
+        return ((AbstractType<T>) type).decompose(value);
+    }
+
+    public static Row makeRow(CFMetaData metadata, Object... clusteringValues)
+    {
+        ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+        for (int i = 0; i < clusteringValues.length; i++)
+            clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+
+        return BTreeRow.noCellLiveRow(new Clustering(clusteringByteBuffers), LivenessInfo.create(metadata, 0, 0));
+    }
+
+    private static PartitionIterator applyChecker(UnfilteredPartitionIterator unfiltered)
+    {
+        int nowInSecs = 0;
+        return DuplicateRowChecker.duringRead(FilteredPartitions.filter(unfiltered, nowInSecs),
+                                              Collections.singletonList(FBUtilities.getBroadcastAddress()));
+    }
+
+    public static UnfilteredPartitionIterator iter(CFMetaData metadata, boolean isReversedOrder, Unfiltered... unfiltereds)
+    {
+        DecoratedKey key = metadata.partitioner.decorateKey(bytes("key"));
+        Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+
+        UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(metadata,
+                                                                          key,
+                                                                          DeletionTime.LIVE,
+                                                                          metadata.partitionColumns(),
+                                                                          Rows.EMPTY_STATIC_ROW,
+                                                                          isReversedOrder,
+                                                                          EncodingStats.NO_STATS)
+        {
+            protected Unfiltered computeNext()
+            {
+                return iterator.hasNext() ? iterator.next() : endOfData();
+            }
+        };
+
+        return new SingletonUnfilteredPartitionIterator(rowIter, false);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org