You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/20 07:08:19 UTC

[cassandra] branch cassandra-3.11 updated (406a859 -> e1a0db7)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 406a859  Fix CQL formatting of read command restrictions for slow query log
     new 4d42c18  Avoid creating duplicate rows during major upgrades
     new e1a0db7  Merge branch 'cassandra-3.0' into cassandra-3.11

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |  18 ++
 .../cassandra/config/DatabaseDescriptor.java       |  31 +++
 src/java/org/apache/cassandra/db/LegacyLayout.java |  18 +-
 .../db/compaction/CompactionIterator.java          |   5 +-
 .../db/partitions/AbstractBTreePartition.java      |   5 +-
 .../db/partitions/ImmutableBTreePartition.java     |   2 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |  26 ++-
 .../db/transform/DuplicateRowChecker.java          | 139 ++++++++++++
 .../org/apache/cassandra/service/ReadCallback.java |   4 +-
 .../cassandra/service/SnapshotVerbHandler.java     |   5 +
 .../org/apache/cassandra/service/StorageProxy.java |  54 +++++
 .../cassandra/service/StorageProxyMBean.java       |  13 ++
 .../cassandra/utils/DiagnosticSnapshotService.java | 188 ++++++++++++++++
 .../cassandra/distributed/impl/Instance.java       |   4 +-
 .../upgrade/MixedModeReadRepairTest.java           |  85 +++++++
 .../distributed/upgrade/UpgradeTestBase.java       |   3 +-
 .../org/apache/cassandra/db/LegacyLayoutTest.java  |  39 +++-
 .../db/compaction/CompactionIteratorTest.java      |  80 ++++++-
 .../db/partition/PartitionUpdateTest.java          | 144 ++++++++++++
 .../db/transform/DuplicateRowCheckerTest.java      | 246 +++++++++++++++++++++
 21 files changed, 1092 insertions(+), 18 deletions(-)
 create mode 100644 src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java
 create mode 100644 src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
 create mode 100644 test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e1a0db798aedc6fcbb02e3076d545581bad28b0e
Merge: 406a859 4d42c18
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed May 20 08:44:06 2020 +0200

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |  18 ++
 .../cassandra/config/DatabaseDescriptor.java       |  31 +++
 src/java/org/apache/cassandra/db/LegacyLayout.java |  18 +-
 .../db/compaction/CompactionIterator.java          |   5 +-
 .../db/partitions/AbstractBTreePartition.java      |   5 +-
 .../db/partitions/ImmutableBTreePartition.java     |   2 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |  26 ++-
 .../db/transform/DuplicateRowChecker.java          | 139 ++++++++++++
 .../org/apache/cassandra/service/ReadCallback.java |   4 +-
 .../cassandra/service/SnapshotVerbHandler.java     |   5 +
 .../org/apache/cassandra/service/StorageProxy.java |  54 +++++
 .../cassandra/service/StorageProxyMBean.java       |  13 ++
 .../cassandra/utils/DiagnosticSnapshotService.java | 188 ++++++++++++++++
 .../cassandra/distributed/impl/Instance.java       |   4 +-
 .../upgrade/MixedModeReadRepairTest.java           |  85 +++++++
 .../distributed/upgrade/UpgradeTestBase.java       |   3 +-
 .../org/apache/cassandra/db/LegacyLayoutTest.java  |  39 +++-
 .../db/compaction/CompactionIteratorTest.java      |  80 ++++++-
 .../db/partition/PartitionUpdateTest.java          | 144 ++++++++++++
 .../db/transform/DuplicateRowCheckerTest.java      | 246 +++++++++++++++++++++
 21 files changed, 1092 insertions(+), 18 deletions(-)

diff --cc CHANGES.txt
index 46625b3,b875ae1..3506589
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.21
 +3.11.7
 + * Fix CQL formatting of read command restrictions for slow query log (CASSANDRA-15503)
 + * Allow sstableloader to use SSL on the native port (CASSANDRA-14904)
 +Merged from 3.0:
+  * Avoid creating duplicate rows during major upgrades (CASSANDRA-15789)
   * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
   * Fix Debian init start/stop (CASSANDRA-15770)
   * Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242)
diff --cc src/java/org/apache/cassandra/config/Config.java
index 7f28546,6003bd1..322f1f5
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -397,12 -362,29 +397,30 @@@ public class Confi
      }
  
      /**
+      * If true, when rows with duplicate clustering keys are detected during a read or compaction
+      * a snapshot will be taken. In the read case, each a snapshot request will be issued to each
+      * replica involved in the query, for compaction the snapshot will be created locally.
+      * These are limited at the replica level so that only a single snapshot per-day can be taken
+      * via this method.
+      *
+      * This requires check_for_duplicate_rows_during_reads and/or check_for_duplicate_rows_during_compaction
+      * below to be enabled
+      */
+     public volatile boolean snapshot_on_duplicate_row_detection = false;
 -
+     /**
+      * If these are enabled duplicate keys will get logged, and if snapshot_on_duplicate_row_detection
+      * is enabled, the table will get snapshotted for offline investigation
+      */
+     public volatile boolean check_for_duplicate_rows_during_reads = true;
+     public volatile boolean check_for_duplicate_rows_during_compaction = true;
+ 
 -    public static boolean isClientMode()
 -    {
 -        return isClientMode;
 -    }
 -
++    /**
 +     * Client mode means that the process is a pure client, that uses C* code base but does
 +     * not read or write local C* database files.
 +     *
 +     * @deprecated migrate to {@link DatabaseDescriptor#clientInitialization(boolean)}
 +     */
 +    @Deprecated
      public static void setClientMode(boolean clientMode)
      {
          isClientMode = clientMode;
diff --cc src/java/org/apache/cassandra/db/LegacyLayout.java
index 09f9cfa,37cc935..4ec0c30
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@@ -28,8 -28,9 +28,10 @@@ import java.util.stream.Collectors
  
  import org.apache.cassandra.cql3.ColumnIdentifier;
  import org.apache.cassandra.cql3.SuperColumnCompatibility;
 +import org.apache.cassandra.config.SchemaConstants;
  import org.apache.cassandra.utils.AbstractIterator;
+ 
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.Iterators;
  import com.google.common.collect.Lists;
  import com.google.common.collect.PeekingIterator;
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 9aba938,b132d90..4460d4d
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@@ -17,14 -17,17 +17,16 @@@
   */
  package org.apache.cassandra.db.compaction;
  
 -import java.util.List;
 -import java.util.UUID;
 +import java.util.*;
  import java.util.function.Predicate;
  
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import com.google.common.collect.Ordering;
  
  import org.apache.cassandra.config.CFMetaData;
+ 
+ import org.apache.cassandra.db.transform.DuplicateRowChecker;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.ColumnFilter;
  import org.apache.cassandra.db.partitions.PurgeFunction;
  import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
  import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
@@@ -104,8 -106,8 +106,9 @@@ public class CompactionIterator extend
                                               ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
                                               : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
          boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug
 +        merged = Transformation.apply(merged, new GarbageSkipper(controller, nowInSec));
-         this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
+         merged = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
+         this.compacted = DuplicateRowChecker.duringCompaction(merged, type);
      }
  
      public boolean isForThrift()
diff --cc src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index befbfbb,2cd9e97..34d6d46
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@@ -271,12 -284,46 +271,12 @@@ public abstract class AbstractBTreePart
          }
      }
  
 -    public class SliceableIterator extends AbstractIterator implements SliceableUnfilteredRowIterator
 -    {
 -        private Iterator<Unfiltered> iterator;
 -
 -        protected SliceableIterator(ColumnFilter selection, boolean isReversed)
 -        {
 -            super(selection, isReversed);
 -        }
 -
 -        protected Unfiltered computeNext()
 -        {
 -            if (iterator == null)
 -                iterator = unfilteredIterator(selection, Slices.ALL, isReverseOrder);
 -            if (!iterator.hasNext())
 -                return endOfData();
 -            return iterator.next();
 -        }
 -
 -        public Iterator<Unfiltered> slice(Slice slice)
 -        {
 -            return sliceIterator(selection, slice, isReverseOrder, current, staticRow);
 -        }
 -    }
 -
 -    public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed)
 -    {
 -        return new SliceableIterator(columns, reversed);
 -    }
 -
 -    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
 -    {
 -        return sliceableUnfilteredIterator(ColumnFilter.all(metadata), false);
 -    }
 -
      protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity)
      {
-         return build(iterator, initialRowCapacity, true);
+         return build(iterator, initialRowCapacity, true, null);
      }
  
-     protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered)
+     protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver)
      {
          CFMetaData metadata = iterator.metadata();
          PartitionColumns columns = iterator.columns();
diff --cc src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 3409079,3560e90..4aca6d2
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@@ -217,10 -214,28 +219,30 @@@ public class PartitionUpdate extends Ab
       * Warning: this method does not close the provided iterator, it is up to
       * the caller to close it.
       */
 -    public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator)
 +    public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, ColumnFilter filter)
      {
 -        return fromIterator(iterator, true,  null);
++
++        return fromIterator(iterator, filter, true,  null);
+     }
+ 
+     private static final NoSpamLogger rowMergingLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+     /**
+      * Removes duplicate rows from incoming iterator, to be used when we can't trust the underlying iterator (like when reading legacy sstables)
+      */
 -    public static PartitionUpdate fromPre30Iterator(UnfilteredRowIterator iterator)
++    public static PartitionUpdate fromPre30Iterator(UnfilteredRowIterator iterator, ColumnFilter filter)
+     {
 -        return fromIterator(iterator, false, (a, b) -> {
++        return fromIterator(iterator, filter, false, (a, b) -> {
+             CFMetaData cfm = iterator.metadata();
+             rowMergingLogger.warn(String.format("Merging rows from pre 3.0 iterator for partition key: %s",
+                                                 cfm.getKeyValidator().getString(iterator.partitionKey().getKey())));
+             return Rows.merge(a, b, FBUtilities.nowInSeconds());
+         });
+     }
+ 
 -    private static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver)
++    private static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, ColumnFilter filter, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver)
+     {
 +        iterator = UnfilteredRowIterators.withOnlyQueriedData(iterator, filter);
-         Holder holder = build(iterator, 16);
+         Holder holder = build(iterator, 16, ordered, quickResolver);
          MutableDeletionInfo deletionInfo = (MutableDeletionInfo) holder.deletionInfo;
          return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false);
      }
@@@ -911,7 -767,7 +933,7 @@@
              try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key))
              {
                  assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families
-                 return PartitionUpdate.fromIterator(iterator, ColumnFilter.all(iterator.metadata()));
 -                return PartitionUpdate.fromPre30Iterator(iterator);
++                return PartitionUpdate.fromPre30Iterator(iterator, ColumnFilter.all(iterator.metadata()));
              }
          }
  
diff --cc src/java/org/apache/cassandra/service/ReadCallback.java
index 3ef2fac,71eb0bc..b312852
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@@ -34,7 -32,10 +34,8 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.partitions.PartitionIterator;
 -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 -import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
+ import org.apache.cassandra.db.transform.DuplicateRowChecker;
 -import org.apache.cassandra.db.transform.Transformation;
  import org.apache.cassandra.exceptions.ReadFailureException;
  import org.apache.cassandra.exceptions.ReadTimeoutException;
  import org.apache.cassandra.exceptions.UnavailableException;
@@@ -144,8 -143,8 +146,8 @@@ public class ReadCallback implements IA
  
          PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve();
          if (logger.isTraceEnabled())
 -            logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +            logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanoTime));
-         return result;
+         return DuplicateRowChecker.duringRead(result, endpoints);
      }
  
      public int blockFor()
diff --cc src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 8678dde,047934c..cdf07f4
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@@ -65,5 -67,14 +67,16 @@@ public interface StorageProxyMBea
      /** Returns each live node's schema version */
      public Map<String, List<String>> getSchemaVersions();
  
 +    public int getNumberOfTables();
++
+     void enableSnapshotOnDuplicateRowDetection();
+     void disableSnapshotOnDuplicateRowDetection();
+     boolean getSnapshotOnDuplicateRowDetectionEnabled();
+ 
+     boolean getCheckForDuplicateRowsDuringReads();
+     void enableCheckForDuplicateRowsDuringReads();
+     void disableCheckForDuplicateRowsDuringReads();
+     boolean getCheckForDuplicateRowsDuringCompaction();
+     void enableCheckForDuplicateRowsDuringCompaction();
+     void disableCheckForDuplicateRowsDuringCompaction();
  }
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 47ad405,d23eec0..5bb449e
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -99,8 -99,11 +99,9 @@@ import org.apache.cassandra.tools.NodeT
  import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.ResultMessage;
+ import org.apache.cassandra.utils.DiagnosticSnapshotService;
  import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
 -import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.UUIDGen;
  import org.apache.cassandra.utils.concurrent.Ref;
@@@ -649,9 -698,11 +650,10 @@@ public class Instance extends IsolatedE
                                  () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
                                  () -> PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES),
                                  () -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
 -                                () -> StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES),
                                  () -> Ref.shutdownReferenceReaper(1L, MINUTES),
                                  () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
-                                 () -> SSTableReader.shutdownBlocking(1L, MINUTES)
+                                 () -> SSTableReader.shutdownBlocking(1L, MINUTES),
+                                 () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES)
              );
              error = parallelRun(error, executor,
                                  () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
diff --cc test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
index fc7c4c4,0bb2459..1bc3af6
--- a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
+++ b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
@@@ -24,8 -24,9 +24,10 @@@ import java.nio.file.Files
  import java.nio.file.Path;
  import java.nio.file.Paths;
  
 +import org.junit.AfterClass;
  import org.apache.cassandra.db.filter.ColumnFilter;
+ import org.apache.cassandra.db.marshal.MapType;
+ import org.apache.cassandra.db.marshal.UTF8Type;
  import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
  import org.apache.cassandra.db.rows.BufferCell;
  import org.apache.cassandra.db.rows.Cell;
@@@ -382,4 -372,39 +384,39 @@@ public class LegacyLayoutTes
          LegacyLayout.fromUnfilteredRowIterator(null, p.unfilteredIterator());
          LegacyLayout.serializedSizeAsLegacyPartition(null, p.unfilteredIterator(), VERSION_21);
      }
- }
+ 
+     @Test
+     public void testCellGrouper()
+     {
+         // CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))
+         CFMetaData cfm = CFMetaData.Builder.create("ks", "table")
+                                            .addPartitionKey("pk", Int32Type.instance)
+                                            .addClusteringColumn("ck", Int32Type.instance)
+                                            .addRegularColumn("v", MapType.getInstance(UTF8Type.instance, UTF8Type.instance, true))
+                                            .build();
+         SerializationHelper helper = new SerializationHelper(cfm, MessagingService.VERSION_22, SerializationHelper.Flag.LOCAL, ColumnFilter.all(cfm));
+         LegacyLayout.CellGrouper cg = new LegacyLayout.CellGrouper(cfm, helper);
+ 
 -        Slice.Bound startBound = Slice.Bound.create(ClusteringPrefix.Kind.INCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
 -        Slice.Bound endBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
++        ClusteringBound startBound = ClusteringBound.create(ClusteringPrefix.Kind.INCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
++        ClusteringBound endBound = ClusteringBound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
+         LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(startBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+         LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(endBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+         LegacyLayout.LegacyRangeTombstone lrt = new LegacyLayout.LegacyRangeTombstone(start, end, new DeletionTime(2, 1588598040));
+         assertTrue(cg.addAtom(lrt));
+ 
+         // add a real cell
+         LegacyLayout.LegacyCell cell = new LegacyLayout.LegacyCell(LegacyLayout.LegacyCell.Kind.REGULAR,
 -                                                                   new LegacyLayout.LegacyCellName(new Clustering(ByteBufferUtil.bytes(2)),
++                                                                   new LegacyLayout.LegacyCellName(Clustering.make(ByteBufferUtil.bytes(2)),
+                                                                                                    cfm.getColumnDefinition(ByteBufferUtil.bytes("v")),
+                                                                                                    ByteBufferUtil.bytes("g")),
+                                                                    ByteBufferUtil.bytes("v"), 3, Integer.MAX_VALUE, 0);
+         assertTrue(cg.addAtom(cell));
+ 
+         // add legacy range tombstone where collection name is null for the end bound (this gets translated to a row tombstone)
 -        startBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
 -        endBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
++        startBound = ClusteringBound.create(ClusteringPrefix.Kind.EXCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
++        endBound = ClusteringBound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)});
+         start = new LegacyLayout.LegacyBound(startBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+         end = new LegacyLayout.LegacyBound(endBound, false, null);
+         assertTrue(cg.addAtom(new LegacyLayout.LegacyRangeTombstone(start, end, new DeletionTime(1, 1588598040))));
+     }
+ }
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
index dc5fd06,549a94d..58c5a00
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@@ -17,379 -17,118 +17,455 @@@
   */
  package org.apache.cassandra.db.compaction;
  
+ import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.assertCommandIssued;
 -import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.iter;
+ import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.makeRow;
++import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.rows;
  import static org.junit.Assert.*;
  
+ import java.net.InetAddress;
  import java.util.*;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
 +import com.google.common.collect.*;
  
  import org.junit.Test;
  
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.CQLTester;
 -import org.apache.cassandra.db.*;
 -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.marshal.Int32Type;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
  import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.io.sstable.ISSTableScanner;
- import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.net.*;
++import org.apache.cassandra.net.IMessageSink;
++import org.apache.cassandra.net.MessageIn;
++import org.apache.cassandra.net.MessageOut;
++import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
  
- public class CompactionIteratorTest
+ public class CompactionIteratorTest extends CQLTester
  {
 +
 +    private static final int NOW = 1000;
 +    private static final int GC_BEFORE = 100;
 +    private static final String KSNAME = "CompactionIteratorTest";
 +    private static final String CFNAME = "Integer1";
 +
 +    static final DecoratedKey kk;
 +    static final CFMetaData metadata;
 +    private static final int RANGE = 1000;
 +    private static final int COUNT = 100;
 +
 +    Map<List<Unfiltered>, DeletionTime> deletionTimes = new HashMap<>();
 +
 +    static {
 +        DatabaseDescriptor.daemonInitialization();
 +
 +        kk = Util.dk("key");
 +
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KSNAME,
 +                                    KeyspaceParams.simple(1),
 +                                    metadata = SchemaLoader.standardCFMD(KSNAME,
 +                                                                         CFNAME,
 +                                                                         1,
 +                                                                         UTF8Type.instance,
 +                                                                         Int32Type.instance,
 +                                                                         Int32Type.instance));
 +    }
 +
 +    // See org.apache.cassandra.db.rows.UnfilteredRowsGenerator.parse for the syntax used in these tests.
 +
 +    @Test
 +    public void testGcCompactionSupersedeLeft()
 +    {
 +        testCompaction(new String[] {
 +            "5<=[140] 10[150] [140]<20 22<[130] [130]<25 30[150]"
 +        }, new String[] {
 +            "7<[160] 15[180] [160]<30 40[120]"
 +        },
 +        3);
 +    }
 +
 +    @Test
 +    public void testGcCompactionSupersedeMiddle()
 +    {
 +        testCompaction(new String[] {
 +            "5<=[140] 10[150] [140]<40 60[150]"
 +        }, new String[] {
 +            "7<=[160] 15[180] [160]<=30 40[120]"
 +        },
 +        3);
 +    }
 +
 +    @Test
 +    public void testGcCompactionSupersedeRight()
 +    {
 +        testCompaction(new String[] {
 +            "9<=[140] 10[150] [140]<40 60[150]"
 +        }, new String[] {
 +            "7<[160] 15[180] [160]<30 40[120]"
 +        },
 +        3);
 +    }
 +
 +    @Test
 +    public void testGcCompactionSwitchInSuperseded()
 +    {
 +        testCompaction(new String[] {
 +            "5<=[140] 10[150] [140]<20 20<=[170] [170]<=50 60[150]"
 +        }, new String[] {
 +            "7<[160] 15[180] [160]<30 40[120]"
 +        },
 +        5);
 +    }
 +
 +    @Test
 +    public void testGcCompactionBoundaries()
 +    {
 +        testCompaction(new String[] {
 +            "5<=[120] [120]<9 9<=[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90"
 +        }, new String[] {
 +            "7<[160] 15[180] [160]<30 40[120] 45<[140] [140]<80 88<=[130] [130]<100"
 +        },
 +        7);
 +    }
 +
 +    @Test
 +    public void testGcCompactionMatches()
 +    {
 +        testCompaction(new String[] {
 +            "5<=[120] [120]<=9 9<[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90 120<=[100] [100]<130"
 +        }, new String[] {
 +            "9<[160] 15[180] [160]<40 40[120] 45<[140] [140]<90 90<=[110] [110]<100 120<=[100] [100]<130"
 +        },
 +        5);
 +    }
 +
 +    @Test
 +    public void testGcCompactionRowDeletion()
 +    {
 +        testCompaction(new String[] {
 +            "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]"
 +        }, new String[] {
 +            "10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]"
 +        },
 +        "25[160] 30[170] 50[120]");
 +    }
 +
 +    @Test
 +    public void testGcCompactionPartitionDeletion()
 +    {
 +        testCompaction(new String[] {
 +            "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]"
 +        }, new String[] {
 +            // Dxx| stands for partition deletion at time xx
 +            "D165|10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]"
 +        },
 +        "30[170]");
 +    }
 +
 +    void testCompaction(String[] inputs, String[] tombstones, String expected)
 +    {
 +        testNonGcCompaction(inputs, tombstones);
 +
 +        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
 +        List<List<Unfiltered>> inputLists = parse(inputs, generator);
 +        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
 +        List<Unfiltered> result = compact(inputLists, tombstoneLists);
 +        System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds");
 +        generator.verifyValid(result);
 +        verifyEquivalent(inputLists, result, tombstoneLists, generator);
 +        List<Unfiltered> expectedResult = generator.parse(expected, NOW - 1);
 +        if (!expectedResult.equals(result))
 +            fail("Expected " + expected + ", got " + generator.str(result));
 +    }
 +
 +    void testCompaction(String[] inputs, String[] tombstones, int expectedCount)
 +    {
 +        testNonGcCompaction(inputs, tombstones);
 +
 +        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
 +        List<List<Unfiltered>> inputLists = parse(inputs, generator);
 +        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
 +        List<Unfiltered> result = compact(inputLists, tombstoneLists);
 +        System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds");
 +        generator.verifyValid(result);
 +        verifyEquivalent(inputLists, result, tombstoneLists, generator);
 +        if (size(result) > expectedCount)
 +            fail("Expected compaction with " + expectedCount + " elements, got " + size(result) + ": " + generator.str(result));
 +    }
 +
 +    int testNonGcCompaction(String[] inputs, String[] tombstones)
 +    {
 +        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
 +        List<List<Unfiltered>> inputLists = parse(inputs, generator);
 +        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
 +        List<Unfiltered> result = compact(inputLists, Collections.emptyList());
 +        System.out.println("Non-GC compaction resulted in " + size(result) + " Unfiltereds");
 +        generator.verifyValid(result);
 +        verifyEquivalent(inputLists, result, tombstoneLists, generator);
 +        return size(result);
 +    }
 +
 +    private static int size(List<Unfiltered> data)
 +    {
 +        return data.stream().mapToInt(x -> x instanceof RangeTombstoneBoundaryMarker ? 2 : 1).sum();
 +    }
 +
 +    private void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> result, List<List<Unfiltered>> tombstoneSources, UnfilteredRowsGenerator generator)
 +    {
 +        // sources + tombstoneSources must be the same as result + tombstoneSources
 +        List<Unfiltered> expected = compact(Iterables.concat(sources, tombstoneSources), Collections.emptyList());
 +        List<Unfiltered> actual = compact(Iterables.concat(ImmutableList.of(result), tombstoneSources), Collections.emptyList());
 +        if (!expected.equals(actual))
 +        {
 +            System.out.println("Equivalence test failure between sources:");
 +            for (List<Unfiltered> partition : sources)
 +                generator.dumpList(partition);
 +            System.out.println("and compacted " + generator.str(result));
 +            System.out.println("with tombstone sources:");
 +            for (List<Unfiltered> partition : tombstoneSources)
 +                generator.dumpList(partition);
 +            System.out.println("expected " + generator.str(expected));
 +            System.out.println("got " + generator.str(actual));
 +            fail("Failed equivalence test.");
 +        }
 +    }
 +
 +    private List<List<Unfiltered>> parse(String[] inputs, UnfilteredRowsGenerator generator)
 +    {
 +        return ImmutableList.copyOf(Lists.transform(Arrays.asList(inputs), x -> parse(x, generator)));
 +    }
 +
 +    private List<Unfiltered> parse(String input, UnfilteredRowsGenerator generator)
 +    {
 +        Matcher m = Pattern.compile("D(\\d+)\\|").matcher(input);
 +        if (m.lookingAt())
 +        {
 +            int del = Integer.parseInt(m.group(1));
 +            input = input.substring(m.end());
 +            List<Unfiltered> list = generator.parse(input, NOW - 1);
 +            deletionTimes.put(list, new DeletionTime(del, del));
 +            return list;
 +        }
 +        else
 +            return generator.parse(input, NOW - 1);
 +    }
 +
 +    private List<Unfiltered> compact(Iterable<List<Unfiltered>> sources, Iterable<List<Unfiltered>> tombstoneSources)
 +    {
 +        List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(sources, list -> ImmutableList.of(listToIterator(list, kk))));
 +        Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>();
 +        transformedSources.put(kk, Iterables.transform(tombstoneSources, list -> listToIterator(list, kk)));
 +        try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
 +             CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
 +                                                              Lists.transform(content, x -> new Scanner(x)),
 +                                                              controller, NOW, null))
 +        {
 +            List<Unfiltered> result = new ArrayList<>();
 +            assertTrue(iter.hasNext());
 +            try (UnfilteredRowIterator partition = iter.next())
 +            {
 +                Iterators.addAll(result, partition);
 +            }
 +            assertFalse(iter.hasNext());
 +            return result;
 +        }
 +    }
 +
 +    private UnfilteredRowIterator listToIterator(List<Unfiltered> list, DecoratedKey key)
 +    {
 +        return UnfilteredRowsGenerator.source(list, metadata, key, deletionTimes.getOrDefault(list, DeletionTime.LIVE));
 +    }
 +
 +    NavigableMap<DecoratedKey, List<Unfiltered>> generateContent(Random rand, UnfilteredRowsGenerator generator,
 +                                                                 List<DecoratedKey> keys, int pcount, int rcount)
 +    {
 +        NavigableMap<DecoratedKey, List<Unfiltered>> map = new TreeMap<>();
 +        for (int i = 0; i < pcount; ++i)
 +        {
 +            DecoratedKey key = keys.get(rand.nextInt(keys.size()));
 +            map.put(key, generator.generateSource(rand, rcount, RANGE, NOW - 5, x -> NOW - 1));
 +        }
 +        return map;
 +    }
 +
 +    @Test
 +    public void testRandom()
 +    {
 +        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
 +        for (int seed = 1; seed < 100; ++seed)
 +        {
 +            Random rand = new Random(seed);
 +            List<List<Unfiltered>> sources = new ArrayList<>();
 +            for (int i = 0; i < 10; ++i)
 +                sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15));
 +            int srcSz = sources.stream().mapToInt(CompactionIteratorTest::size).sum();
 +            List<List<Unfiltered>> tombSources = new ArrayList<>();
 +            for (int i = 0; i < 10; ++i)
 +                sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15));
 +            List<Unfiltered> result = compact(sources, tombSources);
 +            verifyEquivalent(sources, result, tombSources, generator);
 +            assertTrue(size(result) < srcSz);
 +        }
 +    }
 +
 +    class Controller extends CompactionController
 +    {
 +        private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources;
 +
 +        public Controller(ColumnFamilyStore cfs, Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources, int gcBefore)
 +        {
 +            super(cfs, Collections.emptySet(), gcBefore);
 +            this.tombstoneSources = tombstoneSources;
 +        }
 +
 +        @Override
 +        public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, boolean tombstoneOnly)
 +        {
 +            assert tombstoneOnly;
 +            return tombstoneSources.get(key);
 +        }
 +    }
 +
 +    class Scanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner
 +    {
 +        Iterator<UnfilteredRowIterator> iter;
 +
 +        Scanner(Iterable<UnfilteredRowIterator> content)
 +        {
 +            iter = content.iterator();
 +        }
 +
 +        @Override
 +        public boolean isForThrift()
 +        {
 +            return false;
 +        }
 +
 +        @Override
 +        public CFMetaData metadata()
 +        {
 +            return metadata;
 +        }
 +
 +        @Override
 +        public boolean hasNext()
 +        {
 +            return iter.hasNext();
 +        }
 +
 +        @Override
 +        public UnfilteredRowIterator next()
 +        {
 +            return iter.next();
 +        }
 +
 +        @Override
 +        public long getLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        @Override
 +        public long getCurrentPosition()
 +        {
 +            return 0;
 +        }
 +
 +        @Override
 +        public long getBytesScanned()
 +        {
 +            return 0;
 +        }
 +
 +        @Override
 +        public long getCompressedLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        @Override
 +        public String getBackingFiles()
 +        {
 +            return null;
 +        }
 +    }
++
+     @Test
+     public void duplicateRowsTest() throws Throwable
+     {
+         System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0");
+         // Create a table and insert some data. The actual rows read in the test will be synthetic
+         // but this creates an sstable on disk to be snapshotted.
+         createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))");
+         for (int i = 0; i < 10; i++)
+             execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i);
+         flush();
+ 
+         DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
 -        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+         CFMetaData metadata = getCurrentColumnFamilyStore().metadata;
+ 
+         final HashMap<InetAddress, MessageOut> sentMessages = new HashMap<>();
+         IMessageSink sink = new IMessageSink()
+         {
+             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+             {
+                 sentMessages.put(to, message);
+                 return false;
+             }
+ 
+             public boolean allowIncomingMessage(MessageIn message, int id)
+             {
+                 return false;
+             }
+         };
+         MessagingService.instance().addMessageSink(sink);
+ 
+         // no duplicates
+         sentMessages.clear();
 -        iterate(cfs, iter(metadata,
 -                          false,
 -                          makeRow(metadata,0, 0),
 -                          makeRow(metadata,0, 1),
 -                          makeRow(metadata,0, 2)));
++        iterate(makeRow(metadata,0, 0),
++                makeRow(metadata,0, 1),
++                makeRow(metadata,0, 2));
+         assertCommandIssued(sentMessages, false);
+ 
+         // now test with a duplicate row and see that we issue a snapshot command
+         sentMessages.clear();
 -        iterate(cfs, iter(metadata,
 -                          false,
 -                          makeRow(metadata, 0, 0),
 -                          makeRow(metadata, 0, 1),
 -                          makeRow(metadata, 0, 1)));
++        iterate(makeRow(metadata, 0, 0),
++                makeRow(metadata, 0, 1),
++                makeRow(metadata, 0, 1));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
 -    private void iterate(ColumnFamilyStore cfs, UnfilteredPartitionIterator partitions)
++    private void iterate(Unfiltered...unfiltereds)
+     {
 -
 -        try (CompactionController controller = new CompactionController(getCurrentColumnFamilyStore(), Integer.MAX_VALUE);
 -             ISSTableScanner scanner = scanner(cfs, partitions);
++        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
++        DecoratedKey key = cfs.metadata.partitioner.decorateKey(ByteBufferUtil.bytes("key"));
++        try (CompactionController controller = new CompactionController(cfs, Integer.MAX_VALUE);
++             UnfilteredRowIterator rows = rows(metadata, key, false, unfiltereds);
++             ISSTableScanner scanner = new Scanner(Collections.singletonList(rows));
+              CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
+                                                               Collections.singletonList(scanner),
+                                                               controller, FBUtilities.nowInSeconds(), null))
+         {
+             while (iter.hasNext())
+             {
+                 try (UnfilteredRowIterator partition = iter.next())
+                 {
+                     partition.forEachRemaining(u -> {});
+                 }
+             }
+         }
+     }
 -
 -    private ISSTableScanner scanner(final ColumnFamilyStore cfs, final UnfilteredPartitionIterator partitions)
 -    {
 -
 -        return new ISSTableScanner()
 -        {
 -            public long getLengthInBytes() { return 0; }
 -
 -            public long getCurrentPosition() { return 0; }
 -
 -            public String getBackingFiles() { return cfs.getLiveSSTables().iterator().next().toString(); }
 -
 -            public boolean isForThrift() { return false; }
 -
 -            public CFMetaData metadata() { return cfs.metadata; }
 -
 -            public void close() { }
 -
 -            public boolean hasNext() { return partitions.hasNext(); }
 -
 -            public UnfilteredRowIterator next() { return partitions.next(); }
 -        };
 -    }
  }
diff --cc test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
index 0330b65,2bd685c..df23e4f
--- a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
+++ b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
@@@ -17,11 -17,34 +17,35 @@@
   */
  package org.apache.cassandra.db.partition;
  
 +import org.apache.cassandra.UpdateBuilder;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+ 
+ import com.google.common.collect.Lists;
+ 
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.db.Clustering;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.DeletionTime;
+ import org.apache.cassandra.db.Mutation;
  import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.filter.ColumnFilter;
  import org.apache.cassandra.db.partitions.PartitionUpdate;
+ import org.apache.cassandra.db.rows.BTreeRow;
+ import org.apache.cassandra.db.rows.BufferCell;
+ import org.apache.cassandra.db.rows.Cell;
+ import org.apache.cassandra.db.rows.CellPath;
+ import org.apache.cassandra.db.rows.EncodingStats;
+ import org.apache.cassandra.db.rows.Row;
+ import org.apache.cassandra.db.rows.RowAndDeletionMergeIterator;
+ import org.apache.cassandra.db.rows.Rows;
+ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
  import org.junit.Test;
  
@@@ -84,4 -112,121 +111,121 @@@ public class PartitionUpdateTest extend
          update = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), "key0").buildUpdate();
          Assert.assertEquals(0, update.operationCount());
      }
+ 
+     /**
+      * Makes sure we merge duplicate rows, see CASSANDRA-15789
+      */
+     @Test
+     public void testDuplicate()
+     {
+         createTable("CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))");
+         CFMetaData cfm = currentTableMetadata();
+ 
+         DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+ 
+         List<Row> rows = new ArrayList<>();
+         Row.Builder builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
 -        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
++        builder.newRow(Clustering.make(ByteBufferUtil.bytes(2)));
+         builder.addComplexDeletion(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), new DeletionTime(2, 1588586647));
+ 
 -        Cell c = BufferCell.live(cfm, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
++        Cell c = BufferCell.live(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
+         builder.addCell(c);
+ 
+         Row r = builder.build();
+         rows.add(r);
+ 
 -        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
++        builder.newRow(Clustering.make(ByteBufferUtil.bytes(2)));
+         builder.addRowDeletion(new Row.Deletion(new DeletionTime(1588586647, 1), false));
+         r = builder.build();
+         rows.add(r);
+ 
+         RowAndDeletionMergeIterator rmi = new RowAndDeletionMergeIterator(cfm,
+                                                                           dk,
+                                                                           DeletionTime.LIVE,
+                                                                           ColumnFilter.all(cfm),
+                                                                           Rows.EMPTY_STATIC_ROW,
+                                                                           false,
+                                                                           EncodingStats.NO_STATS,
+                                                                           rows.iterator(),
+                                                                           Collections.emptyIterator(),
+                                                                           true);
+ 
 -        PartitionUpdate pu = PartitionUpdate.fromPre30Iterator(rmi);
++        PartitionUpdate pu = PartitionUpdate.fromPre30Iterator(rmi, ColumnFilter.all(cfm));
+         pu.iterator();
+ 
+         Mutation m = new Mutation(getCurrentColumnFamilyStore().keyspace.getName(), dk);
+         m.add(pu);
+         m.apply();
+         getCurrentColumnFamilyStore().forceBlockingFlush();
+ 
+         SSTableReader sst = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next();
+         int count = 0;
+         try (ISSTableScanner scanner = sst.getScanner())
+         {
+             while (scanner.hasNext())
+             {
+                 try (UnfilteredRowIterator iter = scanner.next())
+                 {
+                     while (iter.hasNext())
+                     {
+                         iter.next();
+                         count++;
+                     }
+                 }
+             }
+         }
+         assertEquals(1, count);
+     }
+ 
+     /**
+      * Makes sure we don't create duplicates when merging 2 partition updates
+      */
+     @Test
+     public void testMerge()
+     {
+         createTable("CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))");
+         CFMetaData cfm = currentTableMetadata();
+ 
+         DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+ 
+         Row.Builder builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
 -        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
++        builder.newRow(Clustering.make(ByteBufferUtil.bytes(2)));
+         builder.addComplexDeletion(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), new DeletionTime(2, 1588586647));
 -        Cell c = BufferCell.live(cfm, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
++        Cell c = BufferCell.live(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
+         builder.addCell(c);
+         Row r = builder.build();
+ 
+         PartitionUpdate p1 = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 2);
+         p1.add(r);
+ 
 -        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
++        builder.newRow(Clustering.make(ByteBufferUtil.bytes(2)));
+         builder.addRowDeletion(new Row.Deletion(new DeletionTime(1588586647, 1), false));
+         r = builder.build();
+         PartitionUpdate p2 = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 2);
+         p2.add(r);
+ 
+         Mutation m = new Mutation(getCurrentColumnFamilyStore().keyspace.getName(), dk);
+         m.add(PartitionUpdate.merge(Lists.newArrayList(p1, p2)));
+         m.apply();
+ 
+         getCurrentColumnFamilyStore().forceBlockingFlush();
+ 
+         SSTableReader sst = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next();
+         int count = 0;
+         try (ISSTableScanner scanner = sst.getScanner())
+         {
+             while (scanner.hasNext())
+             {
+                 try (UnfilteredRowIterator iter = scanner.next())
+                 {
+                     while (iter.hasNext())
+                     {
+                         iter.next();
+                         count++;
+                     }
+                 }
+             }
+         }
+         assertEquals(1, count);
+     }
  }
diff --cc test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
index 0000000,78a0c8c..432bce3
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
+++ b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
@@@ -1,0 -1,240 +1,246 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.transform;
+ 
+ import java.net.InetAddress;
+ import java.nio.ByteBuffer;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ 
+ import com.google.common.collect.Iterators;
+ import org.junit.*;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.partitions.*;
+ import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.net.*;
+ import org.apache.cassandra.utils.DiagnosticSnapshotService;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class DuplicateRowCheckerTest extends CQLTester
+ {
+     ColumnFamilyStore cfs;
+     CFMetaData metadata;
+     static HashMap<InetAddress, MessageOut> sentMessages;
+ 
+     @BeforeClass
+     public static void setupMessaging()
+     {
+         sentMessages = new HashMap<>();
+         IMessageSink sink = new IMessageSink()
+         {
+             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+             {
+                 sentMessages.put(to, message);
+                 return false;
+             }
+ 
+             public boolean allowIncomingMessage(MessageIn message, int id)
+             {
+                 return false;
+             }
+         };
+         MessagingService.instance().addMessageSink(sink);
+     }
+ 
+     @Before
+     public void setup() throws Throwable
+     {
+         DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
+         System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0");
+         // Create a table and insert some data. The actual rows read in the test will be synthetic
+         // but this creates an sstable on disk to be snapshotted.
+         createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))");
+         for (int i = 0; i < 10; i++)
+             execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i);
+         getCurrentColumnFamilyStore().forceBlockingFlush();
+ 
+         metadata = getCurrentColumnFamilyStore().metadata;
+         cfs = getCurrentColumnFamilyStore();
+         sentMessages.clear();
+     }
+ 
+     @Test
+     public void noDuplicates()
+     {
+         // no duplicates
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 2)));
+         assertCommandIssued(sentMessages, false);
+     }
+ 
+     @Test
+     public void singleDuplicateForward()
+     {
+ 
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void singleDuplicateReverse()
+     {
+         iterate(iter(metadata,
+                      true,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleContiguousForward()
+     {
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleContiguousReverse()
+     {
+         iterate(iter(metadata,
+                      true,
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleDisjointForward()
+     {
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 2),
+                      makeRow(metadata, 0, 2)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleDisjointReverse()
+     {
+         iterate(iter(metadata,
+                      true,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 2),
+                      makeRow(metadata, 0, 2)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     public static void assertCommandIssued(HashMap<InetAddress, MessageOut> sent, boolean isExpected)
+     {
+         assertEquals(isExpected, !sent.isEmpty());
+         if (isExpected)
+         {
+             assertEquals(1, sent.size());
+             assertTrue(sent.containsKey(FBUtilities.getBroadcastAddress()));
+             SnapshotCommand command = (SnapshotCommand) sent.get(FBUtilities.getBroadcastAddress()).payload;
+             assertTrue(command.snapshot_name.startsWith(DiagnosticSnapshotService.DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX));
+         }
+     }
+ 
+     private void iterate(UnfilteredPartitionIterator iter)
+     {
+         try (PartitionIterator partitions = applyChecker(iter))
+         {
+             while (partitions.hasNext())
+             {
+                 try (RowIterator partition = partitions.next())
+                 {
+                     partition.forEachRemaining(u -> {});
+                 }
+             }
+         }
+     }
+ 
+     @SuppressWarnings("unchecked")
+     private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+     {
+         return ((AbstractType<T>) type).decompose(value);
+     }
+ 
+     public static Row makeRow(CFMetaData metadata, Object... clusteringValues)
+     {
+         ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+         for (int i = 0; i < clusteringValues.length; i++)
+             clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+ 
 -        return BTreeRow.noCellLiveRow(new Clustering(clusteringByteBuffers), LivenessInfo.create(metadata, 0, 0));
++        return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(0, 0));
++    }
++
++    public static UnfilteredRowIterator rows(CFMetaData metadata,
++                                             DecoratedKey key,
++                                             boolean isReversedOrder,
++                                             Unfiltered... unfiltereds)
++    {
++        Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
++        return new AbstractUnfilteredRowIterator(metadata,
++                                                 key,
++                                                 DeletionTime.LIVE,
++                                                 metadata.partitionColumns(),
++                                                 Rows.EMPTY_STATIC_ROW,
++                                                 isReversedOrder,
++                                                 EncodingStats.NO_STATS)
++        {
++            protected Unfiltered computeNext()
++            {
++                return iterator.hasNext() ? iterator.next() : endOfData();
++            }
++        };
+     }
+ 
+     private static PartitionIterator applyChecker(UnfilteredPartitionIterator unfiltered)
+     {
+         int nowInSecs = 0;
+         return DuplicateRowChecker.duringRead(FilteredPartitions.filter(unfiltered, nowInSecs),
+                                               Collections.singletonList(FBUtilities.getBroadcastAddress()));
+     }
+ 
+     public static UnfilteredPartitionIterator iter(CFMetaData metadata, boolean isReversedOrder, Unfiltered... unfiltereds)
+     {
+         DecoratedKey key = metadata.partitioner.decorateKey(bytes("key"));
 -        Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
 -
 -        UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(metadata,
 -                                                                          key,
 -                                                                          DeletionTime.LIVE,
 -                                                                          metadata.partitionColumns(),
 -                                                                          Rows.EMPTY_STATIC_ROW,
 -                                                                          isReversedOrder,
 -                                                                          EncodingStats.NO_STATS)
 -        {
 -            protected Unfiltered computeNext()
 -            {
 -                return iterator.hasNext() ? iterator.next() : endOfData();
 -            }
 -        };
 -
++        UnfilteredRowIterator rowIter = rows(metadata, key, isReversedOrder, unfiltereds);
+         return new SingletonUnfilteredPartitionIterator(rowIter, false);
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org