You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2020/11/02 18:37:06 UTC

[cassandra] branch trunk updated: Produce consistent tombstone to avoid digest mistmatch:

This is an automated email from the ASF dual-hosted git repository.

jasonstack pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56e697d  Produce consistent tombstone to avoid digest mistmatch:
56e697d is described below

commit 56e697dc124e9d94581052cca1ba97ad9b1044c5
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Thu Mar 12 00:36:02 2020 +0800

    Produce consistent tombstone to avoid digest mistmatch:
    
    * fix memtable read creates row deletion while sstable read creates range tombstone marker.
    * return range tombstone marker for memtable read instead of row deletion
    * remove range-tombstome-marker or row deletion when they don't supersede partition deletion
    
    patch by Zhao Yang; reviewed by Andres de la Peña and Marcus Eriksson
    for CASSANDRA-15369
---
 CHANGES.txt                                        |   2 +-
 .../apache/cassandra/cql3/UpdateParameters.java    |   3 +-
 .../cassandra/db/SinglePartitionReadCommand.java   |  26 ++-
 .../db/filter/ClusteringIndexNamesFilter.java      |  24 +-
 .../db/partitions/AbstractBTreePartition.java      | 144 ++++++++----
 .../db/partitions/AtomicBTreePartition.java        |  10 +-
 .../apache/cassandra/db/partitions/Partition.java  |  21 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java     |   3 +-
 .../cassandra/db/rows/RangeTombstoneMarker.java    |   4 +-
 .../db/rows/RowAndDeletionMergeIterator.java       |   4 +-
 .../db/SinglePartitionSliceCommandTest.java        | 250 ++++++++++++++++-----
 .../db/partition/PartitionImplementationTest.java  |  68 +++---
 12 files changed, 375 insertions(+), 184 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 41b23b6..3586d95 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,8 +1,8 @@
 4.0-beta4
+ * Produce consistent tombstone for reads to avoid digest mistmatch (CASSANDRA-15369)
  * Fix SSTableloader issue when restoring a table named backups (CASSANDRA-16235)
  * Invalid serialized size for responses caused by increasing message time by 1ms which caused extra bytes in size calculation (CASSANDRA-16103)
  * Throw BufferOverflowException from DataOutputBuffer for better visibility (CASSANDRA-16214)
-
 4.0-beta3
  * Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks (CASSANDRA-15229)
  * Fail truncation requests when they fail on a replica (CASSANDRA-16208)
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index f90958b..84123f3 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -203,7 +202,7 @@ public class UpdateParameters
             return null;
 
         Partition partition = prefetchedRows.get(key);
-        Row prefetchedRow = partition == null ? null : partition.searchIterator(ColumnFilter.selection(partition.columns()), false).next(clustering);
+        Row prefetchedRow = partition == null ? null : partition.getRow(clustering);
 
         // We need to apply the pending mutations to return the row in its current state
         Row pendingMutations = builder.copy().build();
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 62d67f1..0a0c21c 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
 /**
@@ -928,8 +927,6 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         if (result == null)
             return filter;
 
-        SearchIterator<Clustering<?>, Row> searchIter = result.searchIterator(columnFilter(), false);
-
         RegularAndStaticColumns columns = columnFilter().fetchedColumns();
         NavigableSet<Clustering<?>> clusterings = filter.requestedRows();
 
@@ -940,20 +937,27 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         boolean removeStatic = false;
         if (!columns.statics.isEmpty())
         {
-            Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING);
+            Row staticRow = result.getRow(Clustering.STATIC_CLUSTERING);
             removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp);
         }
 
         NavigableSet<Clustering<?>> toRemove = null;
-        for (Clustering<?> clustering : clusterings)
+        try (UnfilteredRowIterator iterator = result.unfilteredIterator(columnFilter(), clusterings, false))
         {
-            Row row = searchIter.next(clustering);
-            if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp))
-                continue;
+            while (iterator.hasNext())
+            {
+                Unfiltered unfiltered = iterator.next();
+                if (unfiltered == null || !unfiltered.isRow())
+                    continue;
 
-            if (toRemove == null)
-                toRemove = new TreeSet<>(result.metadata().comparator);
-            toRemove.add(clustering);
+                Row row = (Row) unfiltered;
+                if (!canRemoveRow(row, columns.regulars, sstableTimestamp))
+                    continue;
+
+                if (toRemove == null)
+                    toRemove = new TreeSet<>(result.metadata().comparator);
+                toRemove.add(row.clustering());
+            }
         }
 
         if (!removeStatic && toRemove == null)
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index ff63aca..ef9ceff 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -30,7 +30,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
 /**
@@ -138,28 +137,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
 
     public UnfilteredRowIterator getUnfilteredRowIterator(final ColumnFilter columnFilter, final Partition partition)
     {
-        final Iterator<Clustering<?>> clusteringIter = clusteringsInQueryOrder.iterator();
-        final SearchIterator<Clustering<?>, Row> searcher = partition.searchIterator(columnFilter, reversed);
-
-        return new AbstractUnfilteredRowIterator(partition.metadata(),
-                                                 partition.partitionKey(),
-                                                 partition.partitionLevelDeletion(),
-                                                 columnFilter.fetchedColumns(),
-                                                 searcher.next(Clustering.STATIC_CLUSTERING),
-                                                 reversed,
-                                                 partition.stats())
-        {
-            protected Unfiltered computeNext()
-            {
-                while (clusteringIter.hasNext())
-                {
-                    Row row = searcher.next(clusteringIter.next());
-                    if (row != null)
-                        return row;
-                }
-                return endOfData();
-            }
-        };
+        return partition.unfilteredIterator(columnFilter, clusteringsInQueryOrder, isReversed());
     }
 
     public boolean shouldInclude(SSTableReader sstable)
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 3059eb4..44dc0b0 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -18,7 +18,11 @@
 */
 package org.apache.cassandra.db.partitions;
 
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.NavigableSet;
+
+import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
@@ -108,10 +112,35 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
 
     public Row getRow(Clustering<?> clustering)
     {
-        Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
-        // Note that for statics, this will never return null, this will return an empty row. However,
-        // it's more consistent for this method to return null if we don't really have a static row.
-        return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
+        ColumnFilter columns = ColumnFilter.selection(columns());
+        Holder holder = holder();
+
+        if (clustering == Clustering.STATIC_CLUSTERING)
+        {
+            // Note that for statics, this will never return null, this will return an empty row. However,
+            // it's more consistent for this method to return null if we don't really have a static row.
+            Row staticRow = staticRow(holder, columns, true);
+            return staticRow.isEmpty() ? null : staticRow;
+        }
+
+        final Row row = (Row) BTree.find(holder.tree, metadata().comparator, clustering);
+        DeletionTime activeDeletion = holder.deletionInfo.getPartitionDeletion();
+        RangeTombstone rt = holder.deletionInfo.rangeCovering(clustering);
+
+        if (rt != null && rt.deletionTime().supersedes(activeDeletion))
+            activeDeletion = rt.deletionTime();
+
+
+        if (row == null)
+        {
+            // this means our partition level deletion supersedes all other deletions and we don't have to keep the row deletions
+            if (activeDeletion == holder.deletionInfo.getPartitionDeletion())
+                return null;
+            // no need to check activeDeletion.isLive here - if anything superseedes the partitionDeletion
+            // it must be non-live
+            return BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion));
+        }
+        return row.filter(columns, activeDeletion, true, metadata());
     }
 
     private Row staticRow(Holder current, ColumnFilter columns, boolean setActiveDeletionToRow)
@@ -124,43 +153,17 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
         return row == null ? Rows.EMPTY_STATIC_ROW : row;
     }
 
-    public SearchIterator<Clustering<?>, Row> searchIterator(final ColumnFilter columns, final boolean reversed)
+    @Override
+    public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, NavigableSet<Clustering<?>> clusteringsInQueryOrder, boolean reversed)
     {
-        // TODO: we could optimize comparison for "NativeRow" à la #6755
-        final Holder current = holder();
-        return new SearchIterator<Clustering<?>, Row>()
+        Row staticRow = staticRow(holder(), selection, false);
+        if (clusteringsInQueryOrder.isEmpty())
         {
-            private final SearchIterator<Clustering, Row> rawIter = BTree.slice(current.tree, metadata().comparator, desc(reversed));
-            private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
-
-            public Row next(Clustering<?> clustering)
-            {
-                if (clustering == Clustering.STATIC_CLUSTERING)
-                    return staticRow(current, columns, true);
-
-                Row row = rawIter.next(clustering);
-                RangeTombstone rt = current.deletionInfo.rangeCovering(clustering);
-
-                // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row
-                // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion
-                // to carry the proper deletion on the row.
-                DeletionTime activeDeletion = partitionDeletion;
-                if (rt != null && rt.deletionTime().supersedes(activeDeletion))
-                    activeDeletion = rt.deletionTime();
-
-                if (row == null)
-                {
-                    // this means our partition level deletion superseedes all other deletions and we don't have to keep the row deletions
-                    if (activeDeletion == partitionDeletion)
-                        return null;
-                    // no need to check activeDeletion.isLive here - if anything superseedes the partitionDeletion
-                    // it must be non-live
-                    return BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion));
-                }
+            DeletionTime partitionDeletion = holder().deletionInfo.getPartitionDeletion();
+            return UnfilteredRowIterators.noRowsIterator(metadata(), partitionKey(), staticRow, partitionDeletion, reversed);
+        }
 
-                return row.filter(columns, activeDeletion, true, metadata());
-            }
-        };
+        return new ClusteringsIterator(selection, clusteringsInQueryOrder, reversed, holder(), staticRow);
     }
 
     public UnfilteredRowIterator unfilteredIterator()
@@ -216,8 +219,8 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
                   AbstractBTreePartition.this.partitionKey(),
                   current.deletionInfo.getPartitionDeletion(),
                   selection.fetchedColumns(), // non-selected columns will be filtered in subclasses by RowAndDeletionMergeIterator
-                                              // it would also be more precise to return the intersection of the selection and current.columns,
-                                              // but its probably not worth spending time on computing that.
+                  // it would also be more precise to return the intersection of the selection and current.columns,
+                  // but its probably not worth spending time on computing that.
                   staticRow,
                   isReversed,
                   current.stats);
@@ -226,18 +229,14 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
         }
     }
 
-    public class SlicesIterator extends AbstractIterator
+    private class SlicesIterator extends AbstractIterator
     {
         private final Slices slices;
 
         private int idx;
         private Iterator<Unfiltered> currentSlice;
 
-        private SlicesIterator(ColumnFilter selection,
-                               Slices slices,
-                               boolean isReversed,
-                               Holder current,
-                               Row staticRow)
+        private SlicesIterator(ColumnFilter selection, Slices slices, boolean isReversed, Holder current, Row staticRow)
         {
             super(current, staticRow, selection, isReversed);
             this.slices = slices;
@@ -265,6 +264,59 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
         }
     }
 
+    private class ClusteringsIterator extends AbstractIterator
+    {
+        private final Iterator<Clustering<?>> clusteringsInQueryOrder;
+        private final SearchIterator<Clustering<?>, Row> rowSearcher;
+
+        private Iterator<Unfiltered> currentIterator;
+
+        private ClusteringsIterator(ColumnFilter selection,
+                                    NavigableSet<Clustering<?>> clusteringsInQueryOrder,
+                                    boolean isReversed,
+                                    Holder current,
+                                    Row staticRow)
+        {
+            super(current, staticRow, selection, isReversed);
+
+            this.clusteringsInQueryOrder = clusteringsInQueryOrder.iterator();
+            this.rowSearcher = BTree.slice(current.tree, metadata().comparator, desc(isReversed));
+        }
+
+        protected Unfiltered computeNext()
+        {
+            while (true)
+            {
+                if (currentIterator == null)
+                {
+                    if (!clusteringsInQueryOrder.hasNext())
+                        return endOfData();
+
+                    currentIterator = nextIterator(clusteringsInQueryOrder.next());
+                }
+
+                if (currentIterator != null && currentIterator.hasNext())
+                    return currentIterator.next();
+
+                currentIterator = null;
+            }
+        }
+
+        private Iterator<Unfiltered> nextIterator(Clustering<?> next)
+        {
+            Row nextRow = rowSearcher.next(next);
+            // rangeCovering() will return original RT covering clustering key, but we want to generate fake RT with
+            // given clustering bound to be consistent with fake RT generated from sstable read.
+            Iterator<RangeTombstone> deleteIter = current.deletionInfo.rangeIterator(Slice.make(next), isReverseOrder());
+
+            if (nextRow == null && !deleteIter.hasNext())
+                return null;
+
+            Iterator<Row> rowIterator = nextRow == null ? Collections.emptyIterator() : Iterators.singletonIterator(nextRow);
+            return merge(rowIterator, deleteIter, selection, isReverseOrder, current, staticRow);
+        }
+    }
+
     protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity)
     {
         return build(iterator, initialRowCapacity, true);
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index ca20e5f..ed635cd 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NavigableSet;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -32,7 +33,6 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -224,15 +224,15 @@ public final class AtomicBTreePartition extends AbstractBTreePartition
     }
 
     @Override
-    public SearchIterator<Clustering<?>, Row> searchIterator(ColumnFilter columns, boolean reversed)
+    public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
     {
-        return allocator.ensureOnHeap().applyToPartition(super.searchIterator(columns, reversed));
+        return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(selection, slices, reversed));
     }
 
     @Override
-    public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
+    public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, NavigableSet<Clustering<?>> clusteringsInQueryOrder, boolean reversed)
     {
-        return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(selection, slices, reversed));
+        return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(selection, clusteringsInQueryOrder, reversed));
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java b/src/java/org/apache/cassandra/db/partitions/Partition.java
index 2ee9c6d..b6297a1 100644
--- a/src/java/org/apache/cassandra/db/partitions/Partition.java
+++ b/src/java/org/apache/cassandra/db/partitions/Partition.java
@@ -17,12 +17,15 @@
  */
 package org.apache.cassandra.db.partitions;
 
+import java.util.NavigableSet;
+
+import javax.annotation.Nullable;
+
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.Slices;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.utils.SearchIterator;
 
 /**
  * In-memory representation of a Partition.
@@ -49,13 +52,11 @@ public interface Partition
 
     /**
      * Returns the row corresponding to the provided clustering, or null if there is not such row.
+     *
+     * @param clustering clustering key to search
+     * @return row corresponding to the clustering, it's either null or non-empty row.
      */
-    public Row getRow(Clustering<?> clustering);
-
-    /**
-     * Returns an iterator that allows to search specific rows efficiently.
-     */
-    public SearchIterator<Clustering<?>, Row> searchIterator(ColumnFilter columns, boolean reversed);
+    public @Nullable Row getRow(Clustering<?> clustering);
 
     /**
      * Returns an UnfilteredRowIterator over all the rows/RT contained by this partition.
@@ -67,4 +68,10 @@ public interface Partition
      * selected by the provided slices.
      */
     public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed);
+
+    /**
+     * Returns an UnfilteredRowIterator over the rows/RT contained by this partition
+     * selected by the provided clusterings.
+     */
+    public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, NavigableSet<Clustering<?>> clusteringsInQueryOrder, boolean reversed);
 }
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 4a03af2..e8476dd 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -327,7 +327,8 @@ public class BTreeRow extends AbstractRow
         Map<ByteBuffer, DroppedColumn> droppedColumns = metadata.droppedColumns;
 
         boolean mayFilterColumns = !filter.fetchesAllColumns(isStatic());
-        boolean mayHaveShadowed = activeDeletion.supersedes(deletion.time());
+        // When merging sstable data in Row.Merger#merge(), rowDeletion is removed if it doesn't supersede activeDeletion.
+        boolean mayHaveShadowed = !activeDeletion.isLive() && !deletion.time().supersedes(activeDeletion);
 
         if (!mayFilterColumns && !mayHaveShadowed && droppedColumns.isEmpty())
             return this;
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
index d7fa37a..d70ac78 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -158,8 +158,8 @@ public interface RangeTombstoneMarker extends Unfiltered
                 return DeletionTime.LIVE;
 
             DeletionTime biggestDeletionTime = openMarkers[biggestOpenMarker];
-            // it's only open in the merged iterator if it's not shadowed by the partition level deletion
-            return partitionDeletion.supersedes(biggestDeletionTime) ? DeletionTime.LIVE : biggestDeletionTime;
+            // it's only open in the merged iterator if it doesn't supersedes the partition level deletion
+            return !biggestDeletionTime.supersedes(partitionDeletion) ? DeletionTime.LIVE : biggestDeletionTime;
         }
 
         private void updateOpenMarkers()
diff --git a/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
index 0053ec2..cd58daa 100644
--- a/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
@@ -161,7 +161,9 @@ public class RowAndDeletionMergeIterator extends AbstractUnfilteredRowIterator
         while (nextRange == null && ranges.hasNext())
         {
             nextRange = ranges.next();
-            if ((removeShadowedData && partitionLevelDeletion().supersedes(nextRange.deletionTime()))
+            // partition deletion will shadow range tombstone if partition deletion time is greater to equal to range
+            // tombstone time.
+            if ((removeShadowedData && !nextRange.deletionTime().supersedes(partitionLevelDeletion()))
                 || nextRange.deletedSlice().isEmpty(metadata.comparator))
                 nextRange = null;
         }
diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 05f0883..12168ec 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -20,36 +20,31 @@
  */
 package org.apache.cassandra.db;
 
-import static org.junit.Assert.*;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.primitives.Ints;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.Util;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.filter.AbstractClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
@@ -73,15 +68,18 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class SinglePartitionSliceCommandTest
 {
-    private static final Logger logger = LoggerFactory.getLogger(SinglePartitionSliceCommandTest.class);
-
     private static final String KEYSPACE = "ks";
     private static final String TABLE = "tbl";
 
@@ -171,7 +169,6 @@ public class SinglePartitionSliceCommandTest
 
     private void testMultiNamesOrSlicesCommand(boolean flush, boolean isSlice)
     {
-        boolean isTombstone = flush || isSlice;
         int deletionTime = 5;
         int ck1 = 1;
         int uniqueCk1 = 2;
@@ -203,42 +200,24 @@ public class SinglePartitionSliceCommandTest
         while (partition.hasNext())
         {
             Unfiltered unfiltered = partition.next();
-            if (isTombstone)
-            {
-                assertTrue(unfiltered.isRangeTombstoneMarker());
-                RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
-
-                // check if it's open-close pair
-                assertTrue(marker.isOpen(false) == open);
-                // check deletion time same as Range Deletion
-                if (open)
-                    assertEquals(deletionTime, marker.openDeletionTime(false).markedForDeleteAt());
-                else
-                    assertEquals(deletionTime, marker.closeDeletionTime(false).markedForDeleteAt());
 
-                // check clustering values
-                Clustering<?> clustering = Util.clustering(CFM_SLICES.comparator, ck1, count / 2);
-                for (int i = 0; i < CFM_SLICES.comparator.size(); i++)
-                {
-                    int cmp = CFM_SLICES.comparator.compareComponent(i, clustering, marker.clustering());
-                    assertEquals(0, cmp);
-                }
-                open = !open;
-            }
-            else
-            {
-                // deleted row
-                assertTrue(unfiltered.isRow());
-                Row row = (Row) unfiltered;
-                assertEquals(deletionTime, row.deletion().time().markedForDeleteAt());
-                assertEquals(0, row.columnCount()); // no btree
-            }
+            assertTrue(unfiltered.isRangeTombstoneMarker());
+            RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
+
+            // check if it's open-close pair
+            assertEquals(open, marker.isOpen(false));
+            // check deletion time same as Range Deletion
+            DeletionTime delete = (open ? marker.openDeletionTime(false) : marker.closeDeletionTime(false));;
+            assertEquals(deletionTime, delete.markedForDeleteAt());
+
+            // check clustering values
+            Clustering<?> clustering = Util.clustering(CFM_SLICES.comparator, ck1, count / 2);
+            assertArrayEquals(clustering.getRawValues(), marker.clustering().getBufferArray());
+
+            open = !open;
             count++;
         }
-        if (isTombstone)
-            assertEquals(uniqueCk2 * 2, count); // open and close range tombstones
-        else
-            assertEquals(uniqueCk2, count);
+        assertEquals(uniqueCk2 * 2, count); // open and close range tombstones
     }
 
     private void checkForS(UnfilteredPartitionIterator pi)
@@ -315,6 +294,149 @@ public class SinglePartitionSliceCommandTest
         }
     }
 
+    /**
+     * Make sure point read on range tombstone returns the same physical data structure regardless
+     * data is in memtable or sstable, so that we can produce the same digest.
+     */
+    @Test
+    public void testReadOnRangeTombstoneMarker()
+    {
+        QueryProcessor.executeOnceInternal("CREATE TABLE IF NOT EXISTS ks.test_read_rt (k int, c1 int, c2 int, c3 int, v int, primary key (k, c1, c2, c3))");
+        TableMetadata metadata = Schema.instance.getTableMetadata("ks", "test_read_rt");
+        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
+
+        String template = "SELECT * FROM ks.test_read_rt %s";
+        String pointRead = "WHERE k=1 and c1=1 and c2=1 and c3=1";
+        String sliceReadC1C2 = "WHERE k=1 and c1=1 and c2=1";
+        String sliceReadC1 = "WHERE k=1 and c1=1";
+        String partitionRead = "WHERE k=1";
+
+        for (String postfix : Arrays.asList(pointRead, sliceReadC1C2, sliceReadC1, partitionRead))
+        {
+            String query = String.format(template, postfix);
+            cfs.truncateBlocking();
+            QueryProcessor.executeOnceInternal("DELETE FROM ks.test_read_rt USING TIMESTAMP 10 WHERE k=1 AND c1=1");
+
+            List<Unfiltered> memtableUnfiltereds = assertQueryReturnsSingleRT(query);
+            cfs.forceBlockingFlush();
+            List<Unfiltered> sstableUnfiltereds = assertQueryReturnsSingleRT(query);
+
+            String errorMessage = String.format("Expected %s but got %s with postfix '%s'",
+                                                toString(memtableUnfiltereds, metadata),
+                                                toString(sstableUnfiltereds, metadata),
+                                                postfix);
+            assertEquals(errorMessage, memtableUnfiltereds, sstableUnfiltereds);
+        }
+    }
+
+    /**
+     * Partition deletion should remove row deletion when tie
+     */
+    @Test
+    public void testPartitionDeletionRowDeletionTie()
+    {
+        QueryProcessor.executeOnceInternal("CREATE TABLE ks.partition_row_deletion (k int, c int, v int, primary key (k, c))");
+        TableMetadata metadata = Schema.instance.getTableMetadata("ks", "partition_row_deletion");
+        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
+        cfs.disableAutoCompaction();
+
+        BiFunction<Boolean, Boolean, List<Unfiltered>> tester = (flush, multiSSTable)->
+        {
+            cfs.truncateBlocking();
+            QueryProcessor.executeOnceInternal("DELETE FROM ks.partition_row_deletion USING TIMESTAMP 10 WHERE k=1");
+            if (flush && multiSSTable)
+                cfs.forceBlockingFlush();
+            QueryProcessor.executeOnceInternal("DELETE FROM ks.partition_row_deletion USING TIMESTAMP 10 WHERE k=1 and c=1");
+            if (flush)
+                cfs.forceBlockingFlush();
+
+            QueryProcessor.executeOnceInternal("INSERT INTO ks.partition_row_deletion(k,c,v) VALUES(1,1,1) using timestamp 11");
+            if (flush)
+            {
+                cfs.forceBlockingFlush();
+                try
+                {
+                    cfs.forceMajorCompaction();
+                }
+                catch (Throwable e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            try (UnfilteredRowIterator partition = getIteratorFromSinglePartition("SELECT * FROM ks.partition_row_deletion where k=1 and c=1"))
+            {
+                assertEquals(10, partition.partitionLevelDeletion().markedForDeleteAt());
+                return toUnfiltereds(partition);
+            }
+        };
+
+        List<Unfiltered> memtableUnfiltereds = tester.apply(false, false);
+        List<Unfiltered> singleSSTableUnfiltereds = tester.apply(true, false);
+        List<Unfiltered> multiSSTableUnfiltereds = tester.apply(true, true);
+
+        assertEquals(1, singleSSTableUnfiltereds.size());
+        String errorMessage = String.format("Expected %s but got %s", toString(memtableUnfiltereds, metadata), toString(singleSSTableUnfiltereds, metadata));
+        assertEquals(errorMessage, memtableUnfiltereds, singleSSTableUnfiltereds);
+        errorMessage = String.format("Expected %s but got %s", toString(singleSSTableUnfiltereds, metadata), toString(multiSSTableUnfiltereds, metadata));
+        assertEquals(errorMessage, singleSSTableUnfiltereds, multiSSTableUnfiltereds);
+        memtableUnfiltereds.forEach(u -> assertTrue("Expected no row deletion, but got " + u.toString(metadata, true), ((Row) u).deletion().isLive()));
+    }
+
+    /**
+     * Partition deletion should remove range deletion when tie
+     */
+    @Test
+    public void testPartitionDeletionRangeDeletionTie()
+    {
+        QueryProcessor.executeOnceInternal("CREATE TABLE ks.partition_range_deletion (k int, c1 int, c2 int, v int, primary key (k, c1, c2))");
+        TableMetadata metadata = Schema.instance.getTableMetadata("ks", "partition_range_deletion");
+        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
+        cfs.disableAutoCompaction();
+
+        BiFunction<Boolean, Boolean, List<Unfiltered>> tester = (flush, multiSSTable) ->
+        {
+            cfs.truncateBlocking();
+            QueryProcessor.executeOnceInternal("DELETE FROM ks.partition_range_deletion USING TIMESTAMP 10 WHERE k=1");
+            if (flush && multiSSTable)
+                cfs.forceBlockingFlush();
+            QueryProcessor.executeOnceInternal("DELETE FROM ks.partition_range_deletion USING TIMESTAMP 10 WHERE k=1 and c1=1");
+            if (flush)
+                cfs.forceBlockingFlush();
+
+            QueryProcessor.executeOnceInternal("INSERT INTO ks.partition_range_deletion(k,c1,c2,v) VALUES(1,1,1,1) using timestamp 11");
+            if (flush)
+            {
+                cfs.forceBlockingFlush();
+                try
+                {
+                    cfs.forceMajorCompaction();
+                }
+                catch (Throwable e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            try (UnfilteredRowIterator partition = getIteratorFromSinglePartition("SELECT * FROM ks.partition_range_deletion where k=1 and c1=1 and c2=1"))
+            {
+                assertEquals(10, partition.partitionLevelDeletion().markedForDeleteAt());
+                return toUnfiltereds(partition);
+            }
+        };
+
+        List<Unfiltered> memtableUnfiltereds = tester.apply(false, false);
+        List<Unfiltered> singleSSTableUnfiltereds = tester.apply(true, false);
+        List<Unfiltered> multiSSTableUnfiltereds = tester.apply(true, true);
+
+        assertEquals(1, singleSSTableUnfiltereds.size());
+        String errorMessage = String.format("Expected %s but got %s", toString(memtableUnfiltereds, metadata), toString(singleSSTableUnfiltereds, metadata));
+        assertEquals(errorMessage, memtableUnfiltereds, singleSSTableUnfiltereds);
+        errorMessage = String.format("Expected %s but got %s", toString(singleSSTableUnfiltereds, metadata), toString(multiSSTableUnfiltereds, metadata));
+        assertEquals(errorMessage, singleSSTableUnfiltereds, multiSSTableUnfiltereds);
+        memtableUnfiltereds.forEach(u -> assertTrue("Expected row, but got " + u.toString(metadata, true), u.isRow()));
+    }
+
     @Test
     public void toCQLStringIsSafeToCall() throws IOException
     {
@@ -335,12 +457,10 @@ public class SinglePartitionSliceCommandTest
         Assert.assertFalse(ret.isEmpty());
     }
 
-
-    public static List<Unfiltered> getUnfilteredsFromSinglePartition(String q)
+    public static UnfilteredRowIterator getIteratorFromSinglePartition(String q)
     {
         SelectStatement stmt = (SelectStatement) QueryProcessor.parseStatement(q).prepare(ClientState.forInternalCalls());
 
-        List<Unfiltered> unfiltereds = new ArrayList<>();
         SinglePartitionReadQuery.Group<SinglePartitionReadCommand> query = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) stmt.getQuery(QueryOptions.DEFAULT, 0);
         Assert.assertEquals(1, query.queries.size());
         SinglePartitionReadCommand command = Iterables.getOnlyElement(query.queries);
@@ -348,20 +468,26 @@ public class SinglePartitionSliceCommandTest
              UnfilteredPartitionIterator partitions = command.executeLocally(controller))
         {
             assert partitions.hasNext();
-            try (UnfilteredRowIterator partition = partitions.next())
-            {
-                while (partition.hasNext())
-                {
-                    Unfiltered next = partition.next();
-                    unfiltereds.add(next);
-                }
-            }
+            UnfilteredRowIterator partition = partitions.next();
             assert !partitions.hasNext();
+            return partition;
         }
-        return unfiltereds;
     }
 
-    private static void assertQueryReturnsSingleRT(String query)
+    public static List<Unfiltered> getUnfilteredsFromSinglePartition(String q)
+    {
+        try (UnfilteredRowIterator partition = getIteratorFromSinglePartition(q))
+        {
+            return toUnfiltereds(partition);
+        }
+    }
+
+    private static List<Unfiltered> toUnfiltereds(UnfilteredRowIterator partition)
+    {
+        return Lists.newArrayList(partition);
+    }
+
+    private static List<Unfiltered> assertQueryReturnsSingleRT(String query)
     {
         List<Unfiltered> unfiltereds = getUnfilteredsFromSinglePartition(query);
         Assert.assertEquals(2, unfiltereds.size());
@@ -369,6 +495,7 @@ public class SinglePartitionSliceCommandTest
         Assert.assertTrue(((RangeTombstoneMarker) unfiltereds.get(0)).isOpen(false));
         Assert.assertTrue(unfiltereds.get(1).isRangeTombstoneMarker());
         Assert.assertTrue(((RangeTombstoneMarker) unfiltereds.get(1)).isClose(false));
+        return unfiltereds;
     }
 
     private static ByteBuffer bb(int v)
@@ -411,4 +538,9 @@ public class SinglePartitionSliceCommandTest
         assertQueryReturnsSingleRT("SELECT * FROM ks.legacy_mc_inaccurate_min_max WHERE k=100 AND c1=3 AND c2=2 AND c3=2"); // clustering names
 
     }
+
+    private String toString(List<Unfiltered> unfiltereds, TableMetadata metadata)
+    {
+        return unfiltereds.stream().map(u -> u.toString(metadata, true)).collect(Collectors.toList()).toString();
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java b/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java
index effc54f..5b3681e 100644
--- a/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java
+++ b/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java
@@ -50,7 +50,6 @@ import org.apache.cassandra.db.rows.Row.Deletion;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.SearchIterator;
 
 public class PartitionImplementationTest
 {
@@ -331,11 +330,11 @@ public class PartitionImplementationTest
         assertIteratorsEqual(streamOf(invert(slice(sortedContent, multiSlices))).map(colFilter).iterator(),
                              partition.unfilteredIterator(cf, multiSlices, true));
 
-        // search iterator
-        testSearchIterator(sortedContent, partition, ColumnFilter.all(metadata), false);
-        testSearchIterator(sortedContent, partition, cf, false);
-        testSearchIterator(sortedContent, partition, ColumnFilter.all(metadata), true);
-        testSearchIterator(sortedContent, partition, cf, true);
+        // clustering iterator
+        testClusteringsIterator(sortedContent, partition, ColumnFilter.all(metadata), false);
+        testClusteringsIterator(sortedContent, partition, cf, false);
+        testClusteringsIterator(sortedContent, partition, ColumnFilter.all(metadata), true);
+        testClusteringsIterator(sortedContent, partition, cf, true);
 
         // sliceable iter
         testSlicingOfIterators(sortedContent, partition, ColumnFilter.all(metadata), false);
@@ -344,30 +343,34 @@ public class PartitionImplementationTest
         testSlicingOfIterators(sortedContent, partition, cf, true);
     }
 
-    void testSearchIterator(NavigableSet<Clusterable> sortedContent, Partition partition, ColumnFilter cf, boolean reversed)
+    private void testClusteringsIterator(NavigableSet<Clusterable> sortedContent, Partition partition, ColumnFilter cf, boolean reversed)
     {
-        SearchIterator<Clustering<?>, Row> searchIter = partition.searchIterator(cf, reversed);
-        int pos = reversed ? KEY_RANGE : 0;
-        int mul = reversed ? -1 : 1;
-        boolean started = false;
-        while (pos < KEY_RANGE)
+        Function<? super Clusterable, ? extends Clusterable> colFilter = x -> x instanceof Row ? ((Row) x).filter(cf, metadata) : x;
+        NavigableSet<Clustering<?>> clusteringsInQueryOrder = makeClusterings(reversed);
+
+        // fetch each clustering in turn
+        for (Clustering clustering : clusteringsInQueryOrder)
+        {
+            NavigableSet<Clustering<?>> single = new TreeSet<>(metadata.comparator);
+            single.add(clustering);
+            try (UnfilteredRowIterator slicedIter = partition.unfilteredIterator(cf, single, reversed))
+            {
+                assertIteratorsEqual(streamOf(directed(slice(sortedContent, Slice.make(clustering)), reversed)).map(colFilter).iterator(),
+                                     slicedIter);
+            }
+        }
+
+        // Fetch all slices at once
+        try (UnfilteredRowIterator slicedIter = partition.unfilteredIterator(cf, clusteringsInQueryOrder, reversed))
         {
-            int skip = rand.nextInt(KEY_RANGE / 10);
-            pos += skip * mul;
-            Clustering<?> cl = clustering(pos);
-            Row row = searchIter.next(cl);  // returns row with deletion, incl. empty row with deletion
-            if (row == null && skip == 0 && started)    // allowed to return null if already reported row
-                continue;
-            started = true;
-            Row expected = getRow(sortedContent, cl);
-            assertEquals(expected == null, row == null);
-            if (row == null)
-                continue;
-            assertRowsEqual(expected.filter(cf, metadata), row);
+            List<Iterator<? extends Clusterable>> clusterableIterators = new ArrayList<>();
+            clusteringsInQueryOrder.forEach(clustering -> clusterableIterators.add(directed(slice(sortedContent, Slice.make(clustering)), reversed)));
+
+            assertIteratorsEqual(Iterators.concat(clusterableIterators.toArray(new Iterator[0])), slicedIter);
         }
     }
 
-    Slices makeSlices()
+    private Slices makeSlices()
     {
         int pos = 0;
         Slices.Builder builder = new Slices.Builder(metadata.comparator);
@@ -385,7 +388,20 @@ public class PartitionImplementationTest
         return builder.build();
     }
 
-    void testSlicingOfIterators(NavigableSet<Clusterable> sortedContent, AbstractBTreePartition partition, ColumnFilter cf, boolean reversed)
+    private NavigableSet<Clustering<?>> makeClusterings(boolean reversed)
+    {
+        int pos = 0;
+        NavigableSet<Clustering<?>> clusterings = new TreeSet<>(reversed ? metadata.comparator.reversed() : metadata.comparator);
+        while (pos <= KEY_RANGE)
+        {
+            int skip = rand.nextInt(KEY_RANGE / 10) * (rand.nextInt(3) + 2 / 3); // increased chance of getting 0
+            pos += skip;
+            clusterings.add(clustering(pos));
+        }
+        return clusterings;
+    }
+
+    private void testSlicingOfIterators(NavigableSet<Clusterable> sortedContent, AbstractBTreePartition partition, ColumnFilter cf, boolean reversed)
     {
         Function<? super Clusterable, ? extends Clusterable> colFilter = x -> x instanceof Row ? ((Row) x).filter(cf, metadata) : x;
         Slices slices = makeSlices();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org