You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2020/11/02 18:37:06 UTC
[cassandra] branch trunk updated: Produce consistent tombstone to
avoid digest mistmatch:
This is an automated email from the ASF dual-hosted git repository.
jasonstack pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 56e697d Produce consistent tombstone to avoid digest mistmatch:
56e697d is described below
commit 56e697dc124e9d94581052cca1ba97ad9b1044c5
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Thu Mar 12 00:36:02 2020 +0800
Produce consistent tombstone to avoid digest mistmatch:
* fix memtable read creates row deletion while sstable read creates range tombstone marker.
* return range tombstone marker for memtable read instead of row deletion
* remove range-tombstome-marker or row deletion when they don't supersede partition deletion
patch by Zhao Yang; reviewed by Andres de la Peña and Marcus Eriksson
for CASSANDRA-15369
---
CHANGES.txt | 2 +-
.../apache/cassandra/cql3/UpdateParameters.java | 3 +-
.../cassandra/db/SinglePartitionReadCommand.java | 26 ++-
.../db/filter/ClusteringIndexNamesFilter.java | 24 +-
.../db/partitions/AbstractBTreePartition.java | 144 ++++++++----
.../db/partitions/AtomicBTreePartition.java | 10 +-
.../apache/cassandra/db/partitions/Partition.java | 21 +-
.../org/apache/cassandra/db/rows/BTreeRow.java | 3 +-
.../cassandra/db/rows/RangeTombstoneMarker.java | 4 +-
.../db/rows/RowAndDeletionMergeIterator.java | 4 +-
.../db/SinglePartitionSliceCommandTest.java | 250 ++++++++++++++++-----
.../db/partition/PartitionImplementationTest.java | 68 +++---
12 files changed, 375 insertions(+), 184 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 41b23b6..3586d95 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,8 +1,8 @@
4.0-beta4
+ * Produce consistent tombstone for reads to avoid digest mistmatch (CASSANDRA-15369)
* Fix SSTableloader issue when restoring a table named backups (CASSANDRA-16235)
* Invalid serialized size for responses caused by increasing message time by 1ms which caused extra bytes in size calculation (CASSANDRA-16103)
* Throw BufferOverflowException from DataOutputBuffer for better visibility (CASSANDRA-16214)
-
4.0-beta3
* Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks (CASSANDRA-15229)
* Fail truncation requests when they fail on a replica (CASSANDRA-16208)
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index f90958b..84123f3 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -203,7 +202,7 @@ public class UpdateParameters
return null;
Partition partition = prefetchedRows.get(key);
- Row prefetchedRow = partition == null ? null : partition.searchIterator(ColumnFilter.selection(partition.columns()), false).next(clustering);
+ Row prefetchedRow = partition == null ? null : partition.getRow(clustering);
// We need to apply the pending mutations to return the row in its current state
Row pendingMutations = builder.copy().build();
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 62d67f1..0a0c21c 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.*;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTreeSet;
/**
@@ -928,8 +927,6 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
if (result == null)
return filter;
- SearchIterator<Clustering<?>, Row> searchIter = result.searchIterator(columnFilter(), false);
-
RegularAndStaticColumns columns = columnFilter().fetchedColumns();
NavigableSet<Clustering<?>> clusterings = filter.requestedRows();
@@ -940,20 +937,27 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
boolean removeStatic = false;
if (!columns.statics.isEmpty())
{
- Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING);
+ Row staticRow = result.getRow(Clustering.STATIC_CLUSTERING);
removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp);
}
NavigableSet<Clustering<?>> toRemove = null;
- for (Clustering<?> clustering : clusterings)
+ try (UnfilteredRowIterator iterator = result.unfilteredIterator(columnFilter(), clusterings, false))
{
- Row row = searchIter.next(clustering);
- if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp))
- continue;
+ while (iterator.hasNext())
+ {
+ Unfiltered unfiltered = iterator.next();
+ if (unfiltered == null || !unfiltered.isRow())
+ continue;
- if (toRemove == null)
- toRemove = new TreeSet<>(result.metadata().comparator);
- toRemove.add(clustering);
+ Row row = (Row) unfiltered;
+ if (!canRemoveRow(row, columns.regulars, sstableTimestamp))
+ continue;
+
+ if (toRemove == null)
+ toRemove = new TreeSet<>(result.metadata().comparator);
+ toRemove.add(row.clustering());
+ }
}
if (!removeStatic && toRemove == null)
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index ff63aca..ef9ceff 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -30,7 +30,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTreeSet;
/**
@@ -138,28 +137,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
public UnfilteredRowIterator getUnfilteredRowIterator(final ColumnFilter columnFilter, final Partition partition)
{
- final Iterator<Clustering<?>> clusteringIter = clusteringsInQueryOrder.iterator();
- final SearchIterator<Clustering<?>, Row> searcher = partition.searchIterator(columnFilter, reversed);
-
- return new AbstractUnfilteredRowIterator(partition.metadata(),
- partition.partitionKey(),
- partition.partitionLevelDeletion(),
- columnFilter.fetchedColumns(),
- searcher.next(Clustering.STATIC_CLUSTERING),
- reversed,
- partition.stats())
- {
- protected Unfiltered computeNext()
- {
- while (clusteringIter.hasNext())
- {
- Row row = searcher.next(clusteringIter.next());
- if (row != null)
- return row;
- }
- return endOfData();
- }
- };
+ return partition.unfilteredIterator(columnFilter, clusteringsInQueryOrder, isReversed());
}
public boolean shouldInclude(SSTableReader sstable)
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 3059eb4..44dc0b0 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -18,7 +18,11 @@
*/
package org.apache.cassandra.db.partitions;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.NavigableSet;
+
+import com.google.common.collect.Iterators;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
@@ -108,10 +112,35 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
public Row getRow(Clustering<?> clustering)
{
- Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
- // Note that for statics, this will never return null, this will return an empty row. However,
- // it's more consistent for this method to return null if we don't really have a static row.
- return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
+ ColumnFilter columns = ColumnFilter.selection(columns());
+ Holder holder = holder();
+
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ {
+ // Note that for statics, this will never return null, this will return an empty row. However,
+ // it's more consistent for this method to return null if we don't really have a static row.
+ Row staticRow = staticRow(holder, columns, true);
+ return staticRow.isEmpty() ? null : staticRow;
+ }
+
+ final Row row = (Row) BTree.find(holder.tree, metadata().comparator, clustering);
+ DeletionTime activeDeletion = holder.deletionInfo.getPartitionDeletion();
+ RangeTombstone rt = holder.deletionInfo.rangeCovering(clustering);
+
+ if (rt != null && rt.deletionTime().supersedes(activeDeletion))
+ activeDeletion = rt.deletionTime();
+
+
+ if (row == null)
+ {
+ // this means our partition level deletion supersedes all other deletions and we don't have to keep the row deletions
+ if (activeDeletion == holder.deletionInfo.getPartitionDeletion())
+ return null;
+ // no need to check activeDeletion.isLive here - if anything superseedes the partitionDeletion
+ // it must be non-live
+ return BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion));
+ }
+ return row.filter(columns, activeDeletion, true, metadata());
}
private Row staticRow(Holder current, ColumnFilter columns, boolean setActiveDeletionToRow)
@@ -124,43 +153,17 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
return row == null ? Rows.EMPTY_STATIC_ROW : row;
}
- public SearchIterator<Clustering<?>, Row> searchIterator(final ColumnFilter columns, final boolean reversed)
+ @Override
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, NavigableSet<Clustering<?>> clusteringsInQueryOrder, boolean reversed)
{
- // TODO: we could optimize comparison for "NativeRow" à la #6755
- final Holder current = holder();
- return new SearchIterator<Clustering<?>, Row>()
+ Row staticRow = staticRow(holder(), selection, false);
+ if (clusteringsInQueryOrder.isEmpty())
{
- private final SearchIterator<Clustering, Row> rawIter = BTree.slice(current.tree, metadata().comparator, desc(reversed));
- private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
-
- public Row next(Clustering<?> clustering)
- {
- if (clustering == Clustering.STATIC_CLUSTERING)
- return staticRow(current, columns, true);
-
- Row row = rawIter.next(clustering);
- RangeTombstone rt = current.deletionInfo.rangeCovering(clustering);
-
- // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row
- // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion
- // to carry the proper deletion on the row.
- DeletionTime activeDeletion = partitionDeletion;
- if (rt != null && rt.deletionTime().supersedes(activeDeletion))
- activeDeletion = rt.deletionTime();
-
- if (row == null)
- {
- // this means our partition level deletion superseedes all other deletions and we don't have to keep the row deletions
- if (activeDeletion == partitionDeletion)
- return null;
- // no need to check activeDeletion.isLive here - if anything superseedes the partitionDeletion
- // it must be non-live
- return BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion));
- }
+ DeletionTime partitionDeletion = holder().deletionInfo.getPartitionDeletion();
+ return UnfilteredRowIterators.noRowsIterator(metadata(), partitionKey(), staticRow, partitionDeletion, reversed);
+ }
- return row.filter(columns, activeDeletion, true, metadata());
- }
- };
+ return new ClusteringsIterator(selection, clusteringsInQueryOrder, reversed, holder(), staticRow);
}
public UnfilteredRowIterator unfilteredIterator()
@@ -216,8 +219,8 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
AbstractBTreePartition.this.partitionKey(),
current.deletionInfo.getPartitionDeletion(),
selection.fetchedColumns(), // non-selected columns will be filtered in subclasses by RowAndDeletionMergeIterator
- // it would also be more precise to return the intersection of the selection and current.columns,
- // but its probably not worth spending time on computing that.
+ // it would also be more precise to return the intersection of the selection and current.columns,
+ // but its probably not worth spending time on computing that.
staticRow,
isReversed,
current.stats);
@@ -226,18 +229,14 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
}
}
- public class SlicesIterator extends AbstractIterator
+ private class SlicesIterator extends AbstractIterator
{
private final Slices slices;
private int idx;
private Iterator<Unfiltered> currentSlice;
- private SlicesIterator(ColumnFilter selection,
- Slices slices,
- boolean isReversed,
- Holder current,
- Row staticRow)
+ private SlicesIterator(ColumnFilter selection, Slices slices, boolean isReversed, Holder current, Row staticRow)
{
super(current, staticRow, selection, isReversed);
this.slices = slices;
@@ -265,6 +264,59 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
}
}
+ private class ClusteringsIterator extends AbstractIterator
+ {
+ private final Iterator<Clustering<?>> clusteringsInQueryOrder;
+ private final SearchIterator<Clustering<?>, Row> rowSearcher;
+
+ private Iterator<Unfiltered> currentIterator;
+
+ private ClusteringsIterator(ColumnFilter selection,
+ NavigableSet<Clustering<?>> clusteringsInQueryOrder,
+ boolean isReversed,
+ Holder current,
+ Row staticRow)
+ {
+ super(current, staticRow, selection, isReversed);
+
+ this.clusteringsInQueryOrder = clusteringsInQueryOrder.iterator();
+ this.rowSearcher = BTree.slice(current.tree, metadata().comparator, desc(isReversed));
+ }
+
+ protected Unfiltered computeNext()
+ {
+ while (true)
+ {
+ if (currentIterator == null)
+ {
+ if (!clusteringsInQueryOrder.hasNext())
+ return endOfData();
+
+ currentIterator = nextIterator(clusteringsInQueryOrder.next());
+ }
+
+ if (currentIterator != null && currentIterator.hasNext())
+ return currentIterator.next();
+
+ currentIterator = null;
+ }
+ }
+
+ private Iterator<Unfiltered> nextIterator(Clustering<?> next)
+ {
+ Row nextRow = rowSearcher.next(next);
+ // rangeCovering() will return original RT covering clustering key, but we want to generate fake RT with
+ // given clustering bound to be consistent with fake RT generated from sstable read.
+ Iterator<RangeTombstone> deleteIter = current.deletionInfo.rangeIterator(Slice.make(next), isReverseOrder());
+
+ if (nextRow == null && !deleteIter.hasNext())
+ return null;
+
+ Iterator<Row> rowIterator = nextRow == null ? Collections.emptyIterator() : Iterators.singletonIterator(nextRow);
+ return merge(rowIterator, deleteIter, selection, isReverseOrder, current, staticRow);
+ }
+ }
+
protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity)
{
return build(iterator, initialRowCapacity, true);
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index ca20e5f..ed635cd 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.NavigableSet;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -32,7 +33,6 @@ import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -224,15 +224,15 @@ public final class AtomicBTreePartition extends AbstractBTreePartition
}
@Override
- public SearchIterator<Clustering<?>, Row> searchIterator(ColumnFilter columns, boolean reversed)
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
{
- return allocator.ensureOnHeap().applyToPartition(super.searchIterator(columns, reversed));
+ return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(selection, slices, reversed));
}
@Override
- public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, NavigableSet<Clustering<?>> clusteringsInQueryOrder, boolean reversed)
{
- return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(selection, slices, reversed));
+ return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(selection, clusteringsInQueryOrder, reversed));
}
@Override
diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java b/src/java/org/apache/cassandra/db/partitions/Partition.java
index 2ee9c6d..b6297a1 100644
--- a/src/java/org/apache/cassandra/db/partitions/Partition.java
+++ b/src/java/org/apache/cassandra/db/partitions/Partition.java
@@ -17,12 +17,15 @@
*/
package org.apache.cassandra.db.partitions;
+import java.util.NavigableSet;
+
+import javax.annotation.Nullable;
+
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.utils.SearchIterator;
/**
* In-memory representation of a Partition.
@@ -49,13 +52,11 @@ public interface Partition
/**
* Returns the row corresponding to the provided clustering, or null if there is not such row.
+ *
+ * @param clustering clustering key to search
+ * @return row corresponding to the clustering, it's either null or non-empty row.
*/
- public Row getRow(Clustering<?> clustering);
-
- /**
- * Returns an iterator that allows to search specific rows efficiently.
- */
- public SearchIterator<Clustering<?>, Row> searchIterator(ColumnFilter columns, boolean reversed);
+ public @Nullable Row getRow(Clustering<?> clustering);
/**
* Returns an UnfilteredRowIterator over all the rows/RT contained by this partition.
@@ -67,4 +68,10 @@ public interface Partition
* selected by the provided slices.
*/
public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed);
+
+ /**
+ * Returns an UnfilteredRowIterator over the rows/RT contained by this partition
+ * selected by the provided clusterings.
+ */
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, NavigableSet<Clustering<?>> clusteringsInQueryOrder, boolean reversed);
}
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 4a03af2..e8476dd 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -327,7 +327,8 @@ public class BTreeRow extends AbstractRow
Map<ByteBuffer, DroppedColumn> droppedColumns = metadata.droppedColumns;
boolean mayFilterColumns = !filter.fetchesAllColumns(isStatic());
- boolean mayHaveShadowed = activeDeletion.supersedes(deletion.time());
+ // When merging sstable data in Row.Merger#merge(), rowDeletion is removed if it doesn't supersede activeDeletion.
+ boolean mayHaveShadowed = !activeDeletion.isLive() && !deletion.time().supersedes(activeDeletion);
if (!mayFilterColumns && !mayHaveShadowed && droppedColumns.isEmpty())
return this;
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
index d7fa37a..d70ac78 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -158,8 +158,8 @@ public interface RangeTombstoneMarker extends Unfiltered
return DeletionTime.LIVE;
DeletionTime biggestDeletionTime = openMarkers[biggestOpenMarker];
- // it's only open in the merged iterator if it's not shadowed by the partition level deletion
- return partitionDeletion.supersedes(biggestDeletionTime) ? DeletionTime.LIVE : biggestDeletionTime;
+ // it's only open in the merged iterator if it doesn't supersedes the partition level deletion
+ return !biggestDeletionTime.supersedes(partitionDeletion) ? DeletionTime.LIVE : biggestDeletionTime;
}
private void updateOpenMarkers()
diff --git a/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
index 0053ec2..cd58daa 100644
--- a/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
@@ -161,7 +161,9 @@ public class RowAndDeletionMergeIterator extends AbstractUnfilteredRowIterator
while (nextRange == null && ranges.hasNext())
{
nextRange = ranges.next();
- if ((removeShadowedData && partitionLevelDeletion().supersedes(nextRange.deletionTime()))
+ // partition deletion will shadow range tombstone if partition deletion time is greater to equal to range
+ // tombstone time.
+ if ((removeShadowedData && !nextRange.deletionTime().supersedes(partitionLevelDeletion()))
|| nextRange.deletedSlice().isEmpty(metadata.comparator))
nextRange = null;
}
diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 05f0883..12168ec 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -20,36 +20,31 @@
*/
package org.apache.cassandra.db;
-import static org.junit.Assert.*;
-
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.Util;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.filter.AbstractClusteringIndexFilter;
import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
@@ -73,15 +68,18 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.btree.BTreeSet;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class SinglePartitionSliceCommandTest
{
- private static final Logger logger = LoggerFactory.getLogger(SinglePartitionSliceCommandTest.class);
-
private static final String KEYSPACE = "ks";
private static final String TABLE = "tbl";
@@ -171,7 +169,6 @@ public class SinglePartitionSliceCommandTest
private void testMultiNamesOrSlicesCommand(boolean flush, boolean isSlice)
{
- boolean isTombstone = flush || isSlice;
int deletionTime = 5;
int ck1 = 1;
int uniqueCk1 = 2;
@@ -203,42 +200,24 @@ public class SinglePartitionSliceCommandTest
while (partition.hasNext())
{
Unfiltered unfiltered = partition.next();
- if (isTombstone)
- {
- assertTrue(unfiltered.isRangeTombstoneMarker());
- RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
-
- // check if it's open-close pair
- assertTrue(marker.isOpen(false) == open);
- // check deletion time same as Range Deletion
- if (open)
- assertEquals(deletionTime, marker.openDeletionTime(false).markedForDeleteAt());
- else
- assertEquals(deletionTime, marker.closeDeletionTime(false).markedForDeleteAt());
- // check clustering values
- Clustering<?> clustering = Util.clustering(CFM_SLICES.comparator, ck1, count / 2);
- for (int i = 0; i < CFM_SLICES.comparator.size(); i++)
- {
- int cmp = CFM_SLICES.comparator.compareComponent(i, clustering, marker.clustering());
- assertEquals(0, cmp);
- }
- open = !open;
- }
- else
- {
- // deleted row
- assertTrue(unfiltered.isRow());
- Row row = (Row) unfiltered;
- assertEquals(deletionTime, row.deletion().time().markedForDeleteAt());
- assertEquals(0, row.columnCount()); // no btree
- }
+ assertTrue(unfiltered.isRangeTombstoneMarker());
+ RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
+
+ // check if it's open-close pair
+ assertEquals(open, marker.isOpen(false));
+ // check deletion time same as Range Deletion
+ DeletionTime delete = (open ? marker.openDeletionTime(false) : marker.closeDeletionTime(false));;
+ assertEquals(deletionTime, delete.markedForDeleteAt());
+
+ // check clustering values
+ Clustering<?> clustering = Util.clustering(CFM_SLICES.comparator, ck1, count / 2);
+ assertArrayEquals(clustering.getRawValues(), marker.clustering().getBufferArray());
+
+ open = !open;
count++;
}
- if (isTombstone)
- assertEquals(uniqueCk2 * 2, count); // open and close range tombstones
- else
- assertEquals(uniqueCk2, count);
+ assertEquals(uniqueCk2 * 2, count); // open and close range tombstones
}
private void checkForS(UnfilteredPartitionIterator pi)
@@ -315,6 +294,149 @@ public class SinglePartitionSliceCommandTest
}
}
+ /**
+ * Make sure point read on range tombstone returns the same physical data structure regardless
+ * data is in memtable or sstable, so that we can produce the same digest.
+ */
+ @Test
+ public void testReadOnRangeTombstoneMarker()
+ {
+ QueryProcessor.executeOnceInternal("CREATE TABLE IF NOT EXISTS ks.test_read_rt (k int, c1 int, c2 int, c3 int, v int, primary key (k, c1, c2, c3))");
+ TableMetadata metadata = Schema.instance.getTableMetadata("ks", "test_read_rt");
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
+
+ String template = "SELECT * FROM ks.test_read_rt %s";
+ String pointRead = "WHERE k=1 and c1=1 and c2=1 and c3=1";
+ String sliceReadC1C2 = "WHERE k=1 and c1=1 and c2=1";
+ String sliceReadC1 = "WHERE k=1 and c1=1";
+ String partitionRead = "WHERE k=1";
+
+ for (String postfix : Arrays.asList(pointRead, sliceReadC1C2, sliceReadC1, partitionRead))
+ {
+ String query = String.format(template, postfix);
+ cfs.truncateBlocking();
+ QueryProcessor.executeOnceInternal("DELETE FROM ks.test_read_rt USING TIMESTAMP 10 WHERE k=1 AND c1=1");
+
+ List<Unfiltered> memtableUnfiltereds = assertQueryReturnsSingleRT(query);
+ cfs.forceBlockingFlush();
+ List<Unfiltered> sstableUnfiltereds = assertQueryReturnsSingleRT(query);
+
+ String errorMessage = String.format("Expected %s but got %s with postfix '%s'",
+ toString(memtableUnfiltereds, metadata),
+ toString(sstableUnfiltereds, metadata),
+ postfix);
+ assertEquals(errorMessage, memtableUnfiltereds, sstableUnfiltereds);
+ }
+ }
+
+ /**
+ * Partition deletion should remove row deletion when tie
+ */
+ @Test
+ public void testPartitionDeletionRowDeletionTie()
+ {
+ QueryProcessor.executeOnceInternal("CREATE TABLE ks.partition_row_deletion (k int, c int, v int, primary key (k, c))");
+ TableMetadata metadata = Schema.instance.getTableMetadata("ks", "partition_row_deletion");
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
+ cfs.disableAutoCompaction();
+
+ BiFunction<Boolean, Boolean, List<Unfiltered>> tester = (flush, multiSSTable)->
+ {
+ cfs.truncateBlocking();
+ QueryProcessor.executeOnceInternal("DELETE FROM ks.partition_row_deletion USING TIMESTAMP 10 WHERE k=1");
+ if (flush && multiSSTable)
+ cfs.forceBlockingFlush();
+ QueryProcessor.executeOnceInternal("DELETE FROM ks.partition_row_deletion USING TIMESTAMP 10 WHERE k=1 and c=1");
+ if (flush)
+ cfs.forceBlockingFlush();
+
+ QueryProcessor.executeOnceInternal("INSERT INTO ks.partition_row_deletion(k,c,v) VALUES(1,1,1) using timestamp 11");
+ if (flush)
+ {
+ cfs.forceBlockingFlush();
+ try
+ {
+ cfs.forceMajorCompaction();
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ try (UnfilteredRowIterator partition = getIteratorFromSinglePartition("SELECT * FROM ks.partition_row_deletion where k=1 and c=1"))
+ {
+ assertEquals(10, partition.partitionLevelDeletion().markedForDeleteAt());
+ return toUnfiltereds(partition);
+ }
+ };
+
+ List<Unfiltered> memtableUnfiltereds = tester.apply(false, false);
+ List<Unfiltered> singleSSTableUnfiltereds = tester.apply(true, false);
+ List<Unfiltered> multiSSTableUnfiltereds = tester.apply(true, true);
+
+ assertEquals(1, singleSSTableUnfiltereds.size());
+ String errorMessage = String.format("Expected %s but got %s", toString(memtableUnfiltereds, metadata), toString(singleSSTableUnfiltereds, metadata));
+ assertEquals(errorMessage, memtableUnfiltereds, singleSSTableUnfiltereds);
+ errorMessage = String.format("Expected %s but got %s", toString(singleSSTableUnfiltereds, metadata), toString(multiSSTableUnfiltereds, metadata));
+ assertEquals(errorMessage, singleSSTableUnfiltereds, multiSSTableUnfiltereds);
+ memtableUnfiltereds.forEach(u -> assertTrue("Expected no row deletion, but got " + u.toString(metadata, true), ((Row) u).deletion().isLive()));
+ }
+
+ /**
+ * Partition deletion should remove range deletion when tie
+ */
+ @Test
+ public void testPartitionDeletionRangeDeletionTie()
+ {
+ QueryProcessor.executeOnceInternal("CREATE TABLE ks.partition_range_deletion (k int, c1 int, c2 int, v int, primary key (k, c1, c2))");
+ TableMetadata metadata = Schema.instance.getTableMetadata("ks", "partition_range_deletion");
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
+ cfs.disableAutoCompaction();
+
+ BiFunction<Boolean, Boolean, List<Unfiltered>> tester = (flush, multiSSTable) ->
+ {
+ cfs.truncateBlocking();
+ QueryProcessor.executeOnceInternal("DELETE FROM ks.partition_range_deletion USING TIMESTAMP 10 WHERE k=1");
+ if (flush && multiSSTable)
+ cfs.forceBlockingFlush();
+ QueryProcessor.executeOnceInternal("DELETE FROM ks.partition_range_deletion USING TIMESTAMP 10 WHERE k=1 and c1=1");
+ if (flush)
+ cfs.forceBlockingFlush();
+
+ QueryProcessor.executeOnceInternal("INSERT INTO ks.partition_range_deletion(k,c1,c2,v) VALUES(1,1,1,1) using timestamp 11");
+ if (flush)
+ {
+ cfs.forceBlockingFlush();
+ try
+ {
+ cfs.forceMajorCompaction();
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ try (UnfilteredRowIterator partition = getIteratorFromSinglePartition("SELECT * FROM ks.partition_range_deletion where k=1 and c1=1 and c2=1"))
+ {
+ assertEquals(10, partition.partitionLevelDeletion().markedForDeleteAt());
+ return toUnfiltereds(partition);
+ }
+ };
+
+ List<Unfiltered> memtableUnfiltereds = tester.apply(false, false);
+ List<Unfiltered> singleSSTableUnfiltereds = tester.apply(true, false);
+ List<Unfiltered> multiSSTableUnfiltereds = tester.apply(true, true);
+
+ assertEquals(1, singleSSTableUnfiltereds.size());
+ String errorMessage = String.format("Expected %s but got %s", toString(memtableUnfiltereds, metadata), toString(singleSSTableUnfiltereds, metadata));
+ assertEquals(errorMessage, memtableUnfiltereds, singleSSTableUnfiltereds);
+ errorMessage = String.format("Expected %s but got %s", toString(singleSSTableUnfiltereds, metadata), toString(multiSSTableUnfiltereds, metadata));
+ assertEquals(errorMessage, singleSSTableUnfiltereds, multiSSTableUnfiltereds);
+ memtableUnfiltereds.forEach(u -> assertTrue("Expected row, but got " + u.toString(metadata, true), u.isRow()));
+ }
+
@Test
public void toCQLStringIsSafeToCall() throws IOException
{
@@ -335,12 +457,10 @@ public class SinglePartitionSliceCommandTest
Assert.assertFalse(ret.isEmpty());
}
-
- public static List<Unfiltered> getUnfilteredsFromSinglePartition(String q)
+ public static UnfilteredRowIterator getIteratorFromSinglePartition(String q)
{
SelectStatement stmt = (SelectStatement) QueryProcessor.parseStatement(q).prepare(ClientState.forInternalCalls());
- List<Unfiltered> unfiltereds = new ArrayList<>();
SinglePartitionReadQuery.Group<SinglePartitionReadCommand> query = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) stmt.getQuery(QueryOptions.DEFAULT, 0);
Assert.assertEquals(1, query.queries.size());
SinglePartitionReadCommand command = Iterables.getOnlyElement(query.queries);
@@ -348,20 +468,26 @@ public class SinglePartitionSliceCommandTest
UnfilteredPartitionIterator partitions = command.executeLocally(controller))
{
assert partitions.hasNext();
- try (UnfilteredRowIterator partition = partitions.next())
- {
- while (partition.hasNext())
- {
- Unfiltered next = partition.next();
- unfiltereds.add(next);
- }
- }
+ UnfilteredRowIterator partition = partitions.next();
assert !partitions.hasNext();
+ return partition;
}
- return unfiltereds;
}
- private static void assertQueryReturnsSingleRT(String query)
+ public static List<Unfiltered> getUnfilteredsFromSinglePartition(String q)
+ {
+ try (UnfilteredRowIterator partition = getIteratorFromSinglePartition(q))
+ {
+ return toUnfiltereds(partition);
+ }
+ }
+
+ private static List<Unfiltered> toUnfiltereds(UnfilteredRowIterator partition)
+ {
+ return Lists.newArrayList(partition);
+ }
+
+ private static List<Unfiltered> assertQueryReturnsSingleRT(String query)
{
List<Unfiltered> unfiltereds = getUnfilteredsFromSinglePartition(query);
Assert.assertEquals(2, unfiltereds.size());
@@ -369,6 +495,7 @@ public class SinglePartitionSliceCommandTest
Assert.assertTrue(((RangeTombstoneMarker) unfiltereds.get(0)).isOpen(false));
Assert.assertTrue(unfiltereds.get(1).isRangeTombstoneMarker());
Assert.assertTrue(((RangeTombstoneMarker) unfiltereds.get(1)).isClose(false));
+ return unfiltereds;
}
private static ByteBuffer bb(int v)
@@ -411,4 +538,9 @@ public class SinglePartitionSliceCommandTest
assertQueryReturnsSingleRT("SELECT * FROM ks.legacy_mc_inaccurate_min_max WHERE k=100 AND c1=3 AND c2=2 AND c3=2"); // clustering names
}
+
+ private String toString(List<Unfiltered> unfiltereds, TableMetadata metadata)
+ {
+ return unfiltereds.stream().map(u -> u.toString(metadata, true)).collect(Collectors.toList()).toString();
+ }
}
diff --git a/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java b/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java
index effc54f..5b3681e 100644
--- a/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java
+++ b/test/unit/org/apache/cassandra/db/partition/PartitionImplementationTest.java
@@ -50,7 +50,6 @@ import org.apache.cassandra.db.rows.Row.Deletion;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.SearchIterator;
public class PartitionImplementationTest
{
@@ -331,11 +330,11 @@ public class PartitionImplementationTest
assertIteratorsEqual(streamOf(invert(slice(sortedContent, multiSlices))).map(colFilter).iterator(),
partition.unfilteredIterator(cf, multiSlices, true));
- // search iterator
- testSearchIterator(sortedContent, partition, ColumnFilter.all(metadata), false);
- testSearchIterator(sortedContent, partition, cf, false);
- testSearchIterator(sortedContent, partition, ColumnFilter.all(metadata), true);
- testSearchIterator(sortedContent, partition, cf, true);
+ // clustering iterator
+ testClusteringsIterator(sortedContent, partition, ColumnFilter.all(metadata), false);
+ testClusteringsIterator(sortedContent, partition, cf, false);
+ testClusteringsIterator(sortedContent, partition, ColumnFilter.all(metadata), true);
+ testClusteringsIterator(sortedContent, partition, cf, true);
// sliceable iter
testSlicingOfIterators(sortedContent, partition, ColumnFilter.all(metadata), false);
@@ -344,30 +343,34 @@ public class PartitionImplementationTest
testSlicingOfIterators(sortedContent, partition, cf, true);
}
- void testSearchIterator(NavigableSet<Clusterable> sortedContent, Partition partition, ColumnFilter cf, boolean reversed)
+ private void testClusteringsIterator(NavigableSet<Clusterable> sortedContent, Partition partition, ColumnFilter cf, boolean reversed)
{
- SearchIterator<Clustering<?>, Row> searchIter = partition.searchIterator(cf, reversed);
- int pos = reversed ? KEY_RANGE : 0;
- int mul = reversed ? -1 : 1;
- boolean started = false;
- while (pos < KEY_RANGE)
+ Function<? super Clusterable, ? extends Clusterable> colFilter = x -> x instanceof Row ? ((Row) x).filter(cf, metadata) : x;
+ NavigableSet<Clustering<?>> clusteringsInQueryOrder = makeClusterings(reversed);
+
+ // fetch each clustering in turn
+ for (Clustering clustering : clusteringsInQueryOrder)
+ {
+ NavigableSet<Clustering<?>> single = new TreeSet<>(metadata.comparator);
+ single.add(clustering);
+ try (UnfilteredRowIterator slicedIter = partition.unfilteredIterator(cf, single, reversed))
+ {
+ assertIteratorsEqual(streamOf(directed(slice(sortedContent, Slice.make(clustering)), reversed)).map(colFilter).iterator(),
+ slicedIter);
+ }
+ }
+
+ // Fetch all slices at once
+ try (UnfilteredRowIterator slicedIter = partition.unfilteredIterator(cf, clusteringsInQueryOrder, reversed))
{
- int skip = rand.nextInt(KEY_RANGE / 10);
- pos += skip * mul;
- Clustering<?> cl = clustering(pos);
- Row row = searchIter.next(cl); // returns row with deletion, incl. empty row with deletion
- if (row == null && skip == 0 && started) // allowed to return null if already reported row
- continue;
- started = true;
- Row expected = getRow(sortedContent, cl);
- assertEquals(expected == null, row == null);
- if (row == null)
- continue;
- assertRowsEqual(expected.filter(cf, metadata), row);
+ List<Iterator<? extends Clusterable>> clusterableIterators = new ArrayList<>();
+ clusteringsInQueryOrder.forEach(clustering -> clusterableIterators.add(directed(slice(sortedContent, Slice.make(clustering)), reversed)));
+
+ assertIteratorsEqual(Iterators.concat(clusterableIterators.toArray(new Iterator[0])), slicedIter);
}
}
- Slices makeSlices()
+ private Slices makeSlices()
{
int pos = 0;
Slices.Builder builder = new Slices.Builder(metadata.comparator);
@@ -385,7 +388,20 @@ public class PartitionImplementationTest
return builder.build();
}
- void testSlicingOfIterators(NavigableSet<Clusterable> sortedContent, AbstractBTreePartition partition, ColumnFilter cf, boolean reversed)
+ private NavigableSet<Clustering<?>> makeClusterings(boolean reversed)
+ {
+ int pos = 0;
+ NavigableSet<Clustering<?>> clusterings = new TreeSet<>(reversed ? metadata.comparator.reversed() : metadata.comparator);
+ while (pos <= KEY_RANGE)
+ {
+ int skip = rand.nextInt(KEY_RANGE / 10) * (rand.nextInt(3) + 2 / 3); // increased chance of getting 0
+ pos += skip;
+ clusterings.add(clustering(pos));
+ }
+ return clusterings;
+ }
+
+ private void testSlicingOfIterators(NavigableSet<Clusterable> sortedContent, AbstractBTreePartition partition, ColumnFilter cf, boolean reversed)
{
Function<? super Clusterable, ? extends Clusterable> colFilter = x -> x instanceof Row ? ((Row) x).filter(cf, metadata) : x;
Slices slices = makeSlices();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org