You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/02/10 14:15:14 UTC
cassandra git commit: Optimize disk seek using min/max column name
meta data when the LIMIT clause is used
Repository: cassandra
Updated Branches:
refs/heads/trunk 2fe34badb -> b11fba750
Optimize disk seek using min/max column name meta data when the LIMIT clause is used
patch by Stefania Alborghetti; reviewed blambov for CASSANDRA-8180
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b11fba75
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b11fba75
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b11fba75
Branch: refs/heads/trunk
Commit: b11fba750c610de5e97acba070cc571cf0a96416
Parents: 2fe34ba
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Mar 20 08:02:39 2015 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Feb 10 13:14:56 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cassandra/config/DatabaseDescriptor.java | 6 +
.../db/SinglePartitionReadCommand.java | 144 +++---
.../columniterator/AbstractSSTableIterator.java | 5 +-
.../LazilyInitializedUnfilteredRowIterator.java | 7 +-
.../UnfilteredRowIteratorWithLowerBound.java | 212 +++++++++
.../io/sstable/format/SSTableReader.java | 16 +
.../apache/cassandra/utils/IMergeIterator.java | 1 +
.../cassandra/utils/IteratorWithLowerBound.java | 24 +
.../apache/cassandra/utils/MergeIterator.java | 47 +-
.../org/apache/cassandra/cql3/CQLTester.java | 23 +-
.../validation/entities/StaticColumnsTest.java | 10 +
.../miscellaneous/SSTablesIteratedTest.java | 455 +++++++++++++++++++
.../cql3/validation/operations/DeleteTest.java | 60 ++-
.../cql3/validation/operations/InsertTest.java | 6 -
.../cql3/validation/operations/UpdateTest.java | 6 -
16 files changed, 929 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6cd4cf5..e6067a6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
3.4
+ * Optimize disk seek using min/max column name meta data when the LIMIT clause is used
+ (CASSANDRA-8180)
* Add LIKE support to CQL3 (CASSANDRA-11067)
* Generic Java UDF types (CASSANDRA-10819)
* cqlsh: Include sub-second precision in timestamps by default (CASSANDRA-10428)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index b09605f..5c2a5c9 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -885,6 +885,12 @@ public class DatabaseDescriptor
return conf.column_index_size_in_kb * 1024;
}
+ @VisibleForTesting
+ public static void setColumnIndexSize(int val)
+ {
+ conf.column_index_size_in_kb = val;
+ }
+
public static int getBatchSizeWarnThreshold()
{
return conf.batch_size_warn_threshold_in_kb * 1024;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 680b4b5..1a0b400 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.lifecycle.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -54,7 +55,6 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTreeSet;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.HeapAllocator;
/**
@@ -487,9 +487,9 @@ public class SinglePartitionReadCommand extends ReadCommand
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
-
List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
ClusteringIndexFilter filter = clusteringIndexFilter();
+ long minTimestamp = Long.MAX_VALUE;
try
{
@@ -499,11 +499,14 @@ public class SinglePartitionReadCommand extends ReadCommand
if (partition == null)
continue;
+ minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
@SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
}
+
/*
* We can't eliminate full sstables based on the timestamp of what we've already read like
* in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
@@ -516,16 +519,13 @@ public class SinglePartitionReadCommand extends ReadCommand
* In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
* in one pass, and minimize the number of sstables for which we read a partition tombstone.
*/
- int sstablesIterated = 0;
Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
- List<SSTableReader> skippedSSTables = null;
long mostRecentPartitionTombstone = Long.MIN_VALUE;
- long minTimestamp = Long.MAX_VALUE;
int nonIntersectingSSTables = 0;
+ List<SSTableReader> skippedSSTablesWithTombstones = null;
for (SSTableReader sstable : view.sstables)
{
- minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
// if we've already seen a partition tombstone with a timestamp greater
// than the most recent update to this sstable, we can skip it
if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
@@ -534,73 +534,55 @@ public class SinglePartitionReadCommand extends ReadCommand
if (!shouldInclude(sstable))
{
nonIntersectingSSTables++;
- // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
- if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
- {
- if (skippedSSTables == null)
- skippedSSTables = new ArrayList<>();
- skippedSSTables.add(sstable);
+ if (sstable.hasTombstones())
+ { // if sstable has tombstones we need to check after one pass if it can be safely skipped
+ if (skippedSSTablesWithTombstones == null)
+ skippedSSTablesWithTombstones = new ArrayList<>();
+ skippedSSTablesWithTombstones.add(sstable);
}
continue;
}
- sstable.incrementReadCount();
- @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
- UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift());
+ minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
+
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception,
+ // or through the closing of the final merged iterator
+ UnfilteredRowIteratorWithLowerBound iter = makeIterator(sstable, true);
if (!sstable.isRepaired())
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
- mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
- sstablesIterated++;
+ iterators.add(iter);
+ mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+ iter.partitionLevelDeletion().markedForDeleteAt());
}
int includedDueToTombstones = 0;
- // Check for partition tombstones in the skipped sstables
- if (skippedSSTables != null)
+ // Check for sstables with tombstones that are not expired
+ if (skippedSSTablesWithTombstones != null)
{
- for (SSTableReader sstable : skippedSSTables)
+ for (SSTableReader sstable : skippedSSTablesWithTombstones)
{
if (sstable.getMaxTimestamp() <= minTimestamp)
continue;
- sstable.incrementReadCount();
- @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
- UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift());
- if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
- {
- iterators.add(iter);
- if (!sstable.isRepaired())
- oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- includedDueToTombstones++;
- sstablesIterated++;
- }
- else
- {
- iter.close();
- }
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is close on exception,
+ // or through the closing of the final merged iterator
+ UnfilteredRowIteratorWithLowerBound iter = makeIterator(sstable, false);
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+
+ iterators.add(iter);
+ includedDueToTombstones++;
}
}
if (Tracing.isTracing())
Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
- nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
-
- cfs.metric.updateSSTableIterated(sstablesIterated);
+ nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
if (iterators.isEmpty())
return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed());
- Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
-
- @SuppressWarnings("resource") // Closed through the closing of the result of that method.
- UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
- if (!merged.isEmpty())
- {
- DecoratedKey key = merged.partitionKey();
- cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
- }
-
- return withStateTracking(merged);
+ return withStateTracking(withSSTablesIterated(iterators, cfs.metric));
}
catch (RuntimeException | Error e)
{
@@ -627,6 +609,50 @@ public class SinglePartitionReadCommand extends ReadCommand
return clusteringIndexFilter().shouldInclude(sstable);
}
+ private UnfilteredRowIteratorWithLowerBound makeIterator(final SSTableReader sstable, boolean applyThriftTransformation)
+ {
+ return new UnfilteredRowIteratorWithLowerBound(partitionKey(),
+ sstable,
+ clusteringIndexFilter(),
+ columnFilter(),
+ isForThrift(),
+ nowInSec(),
+ applyThriftTransformation);
+ }
+
+ /**
+ * Return a wrapped iterator that when closed will update the sstables iterated and READ sample metrics.
+ * Note that we cannot use the Transformations framework because they greedily get the static row, which
+ * would cause all iterators to be initialized and hence all sstables to be accessed.
+ */
+ private UnfilteredRowIterator withSSTablesIterated(List<UnfilteredRowIterator> iterators,
+ TableMetrics metrics)
+ {
+ @SuppressWarnings("resource") // Closed through the closing of the result of the caller method.
+ UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
+
+ if (!merged.isEmpty())
+ {
+ DecoratedKey key = merged.partitionKey();
+ metrics.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+ }
+
+ class UpdateSstablesIterated extends Transformation
+ {
+ public void onPartitionClose()
+ {
+ int sstablesIterated = (int)iterators.stream()
+ .filter(it -> it instanceof LazilyInitializedUnfilteredRowIterator)
+ .filter(it -> ((LazilyInitializedUnfilteredRowIterator)it).initialized())
+ .count();
+
+ metrics.updateSSTableIterated(sstablesIterated);
+ Tracing.trace("Merged data from memtables and {} sstables", sstablesIterated);
+ }
+ };
+ return Transformation.apply(merged, new UpdateSstablesIterated());
+ }
+
private boolean queryNeitherCountersNorCollections()
{
for (ColumnDefinition column : columnFilter().fetchedColumns())
@@ -693,8 +719,8 @@ public class SinglePartitionReadCommand extends ReadCommand
// however: if it is set, it impacts everything and must be included. Getting that top-level partition deletion costs us
// some seek in general however (unless the partition is indexed and is in the key cache), so we first check if the sstable
// has any tombstone at all as a shortcut.
- if (sstable.getSSTableMetadata().maxLocalDeletionTime == Integer.MAX_VALUE)
- continue; // Means no tombstone at all, we can skip that sstable
+ if (!sstable.hasTombstones())
+ continue; // no tombstone at all, we can skip that sstable
// We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable.
sstable.incrementReadCount();
@@ -711,7 +737,7 @@ public class SinglePartitionReadCommand extends ReadCommand
Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
sstable.incrementReadCount();
- try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift());)
+ try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift()))
{
if (iter.isEmpty())
continue;
@@ -741,13 +767,9 @@ public class SinglePartitionReadCommand extends ReadCommand
try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
{
final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter, columnFilter()));
- StageManager.getStage(Stage.MUTATION).execute(new Runnable()
- {
- public void run()
- {
- // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
- Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
- }
+ StageManager.getStage(Stage.MUTATION).execute(() -> {
+ // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
+ Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
});
}
}
@@ -909,7 +931,7 @@ public class SinglePartitionReadCommand extends ReadCommand
public static Group one(SinglePartitionReadCommand command)
{
- return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits());
+ return new Group(Collections.singletonList(command), command.limits());
}
public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 792f5ad..d55161b 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.columniterator;
import java.io.IOException;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -217,9 +216,7 @@ abstract class AbstractSSTableIterator implements UnfilteredRowIterator
public EncodingStats stats()
{
- // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
- // SerializationHeader.make() for details) so we use the latter instead.
- return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
+ return sstable.stats();
}
public boolean hasNext()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
index 1bf78dd..fc5bdbe 100644
--- a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
@@ -42,12 +42,17 @@ public abstract class LazilyInitializedUnfilteredRowIterator extends AbstractIte
protected abstract UnfilteredRowIterator initializeIterator();
- private void maybeInit()
+ protected void maybeInit()
{
if (iterator == null)
iterator = initializeIterator();
}
+ public boolean initialized()
+ {
+ return iterator != null;
+ }
+
public CFMetaData metadata()
{
maybeInit();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
new file mode 100644
index 0000000..4f55677
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
@@ -0,0 +1,212 @@
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.utils.IteratorWithLowerBound;
+
+/**
+ * An unfiltered row iterator with a lower bound retrieved from either the global
+ * sstable statistics or the row index lower bounds (if available in the cache).
+ * Before initializing the sstable unfiltered row iterator, we return an empty row
+ * with the clustering set to the lower bound. The empty row will be filtered out and
+ * the result is that if we don't need to access this sstable, i.e. due to the LIMIT conditon,
+ * then we will not. See CASSANDRA-8180 for examples of why this is useful.
+ */
+public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilteredRowIterator implements IteratorWithLowerBound<Unfiltered>
+{
+ private final SSTableReader sstable;
+ private final ClusteringIndexFilter filter;
+ private final ColumnFilter selectedColumns;
+ private final boolean isForThrift;
+ private final int nowInSec;
+ private final boolean applyThriftTransformation;
+ private RangeTombstone.Bound lowerBound;
+ private boolean firstItemRetrieved;
+
+ public UnfilteredRowIteratorWithLowerBound(DecoratedKey partitionKey,
+ SSTableReader sstable,
+ ClusteringIndexFilter filter,
+ ColumnFilter selectedColumns,
+ boolean isForThrift,
+ int nowInSec,
+ boolean applyThriftTransformation)
+ {
+ super(partitionKey);
+ this.sstable = sstable;
+ this.filter = filter;
+ this.selectedColumns = selectedColumns;
+ this.isForThrift = isForThrift;
+ this.nowInSec = nowInSec;
+ this.applyThriftTransformation = applyThriftTransformation;
+ this.lowerBound = null;
+ this.firstItemRetrieved = false;
+ }
+
+ public Unfiltered lowerBound()
+ {
+ if (lowerBound != null)
+ return makeBound(lowerBound);
+
+ // The partition index lower bound is more accurate than the sstable metadata lower bound but it is only
+ // present if the iterator has already been initialized, which we only do when there are tombstones since in
+ // this case we cannot use the sstable metadata clustering values
+ RangeTombstone.Bound ret = getPartitionIndexLowerBound();
+ return ret != null ? makeBound(ret) : makeBound(getMetadataLowerBound());
+ }
+
+ private Unfiltered makeBound(RangeTombstone.Bound bound)
+ {
+ if (bound == null)
+ return null;
+
+ if (lowerBound != bound)
+ lowerBound = bound;
+
+ return new RangeTombstoneBoundMarker(lowerBound, DeletionTime.LIVE);
+ }
+
+ @Override
+ protected UnfilteredRowIterator initializeIterator()
+ {
+ sstable.incrementReadCount();
+
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed(), isForThrift);
+ return isForThrift && applyThriftTransformation
+ ? ThriftResultsMerger.maybeWrap(iter, nowInSec)
+ : iter;
+ }
+
+ @Override
+ protected Unfiltered computeNext()
+ {
+ Unfiltered ret = super.computeNext();
+ if (firstItemRetrieved)
+ return ret;
+
+ // Check that the lower bound is not bigger than the first item retrieved
+ firstItemRetrieved = true;
+ if (lowerBound != null && ret != null)
+ assert comparator().compare(lowerBound, ret.clustering()) <= 0
+ : String.format("Lower bound [%s ]is bigger than first returned value [%s] for sstable %s",
+ lowerBound.toString(sstable.metadata),
+ ret.toString(sstable.metadata),
+ sstable.getFilename());
+
+ return ret;
+ }
+
+ private Comparator<Clusterable> comparator()
+ {
+ return filter.isReversed() ? sstable.metadata.comparator.reversed() : sstable.metadata.comparator;
+ }
+
+ @Override
+ public CFMetaData metadata()
+ {
+ return sstable.metadata;
+ }
+
+ @Override
+ public boolean isReverseOrder()
+ {
+ return filter.isReversed();
+ }
+
+ @Override
+ public PartitionColumns columns()
+ {
+ return selectedColumns.fetchedColumns();
+ }
+
+ @Override
+ public EncodingStats stats()
+ {
+ return sstable.stats();
+ }
+
+ @Override
+ public DeletionTime partitionLevelDeletion()
+ {
+ if (!sstable.hasTombstones())
+ return DeletionTime.LIVE;
+
+ return super.partitionLevelDeletion();
+ }
+
+ @Override
+ public Row staticRow()
+ {
+ if (columns().statics.isEmpty())
+ return Rows.EMPTY_STATIC_ROW;
+
+ return super.staticRow();
+ }
+
+ /**
+ * @return the lower bound stored on the index entry for this partition, if available.
+ */
+ private RangeTombstone.Bound getPartitionIndexLowerBound()
+ {
+ // Creating the iterator ensures that rowIndexEntry is loaded if available (partitions bigger than
+ // DatabaseDescriptor.column_index_size_in_kb)
+ if (!canUseMetadataLowerBound())
+ maybeInit();
+
+ RowIndexEntry rowIndexEntry = sstable.getCachedPosition(partitionKey(), false);
+ if (rowIndexEntry == null)
+ return null;
+
+ List<IndexHelper.IndexInfo> columns = rowIndexEntry.columnsIndex();
+ if (columns.size() == 0)
+ return null;
+
+ IndexHelper.IndexInfo column = columns.get(filter.isReversed() ? columns.size() - 1 : 0);
+ ClusteringPrefix lowerBoundPrefix = filter.isReversed() ? column.lastName : column.firstName;
+ assert lowerBoundPrefix.getRawValues().length <= sstable.metadata.comparator.size() :
+ String.format("Unexpected number of clustering values %d, expected %d or fewer for %s",
+ lowerBoundPrefix.getRawValues().length,
+ sstable.metadata.comparator.size(),
+ sstable.getFilename());
+ return RangeTombstone.Bound.inclusiveOpen(filter.isReversed(), lowerBoundPrefix.getRawValues());
+ }
+
+ /**
+ * @return true if we can use the clustering values in the stats of the sstable:
+ * - we need the latest stats file format (or else the clustering values create clusterings with the wrong size)
+ * - we cannot create tombstone bounds from these values only and so we rule out sstables with tombstones
+ */
+ private boolean canUseMetadataLowerBound()
+ {
+ return !sstable.hasTombstones() && sstable.descriptor.version.hasNewStatsFile();
+ }
+
+ /**
+ * @return a global lower bound made from the clustering values stored in the sstable metadata, note that
+ * this currently does not correctly compare tombstone bounds, especially ranges.
+ */
+ private RangeTombstone.Bound getMetadataLowerBound()
+ {
+ if (!canUseMetadataLowerBound())
+ return null;
+
+ final StatsMetadata m = sstable.getSSTableMetadata();
+ List<ByteBuffer> vals = filter.isReversed() ? m.maxClusteringValues : m.minClusteringValues;
+ assert vals.size() <= sstable.metadata.comparator.size() :
+ String.format("Unexpected number of clustering values %d, expected %d or fewer for %s",
+ vals.size(),
+ sstable.metadata.comparator.size(),
+ sstable.getFilename());
+ return RangeTombstone.Bound.inclusiveOpen(filter.isReversed(), vals.toArray(new ByteBuffer[vals.size()]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index e152540..495d831 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
@@ -1922,6 +1923,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return sstableMetadata.maxLocalDeletionTime;
}
+ /** sstable contains no tombstones if maxLocalDeletionTime == Integer.MAX_VALUE */
+ public boolean hasTombstones()
+ {
+ // sstable contains no tombstone if minLocalDeletionTime is still set to the default value Integer.MAX_VALUE
+ // which is bigger than any valid deletion times
+ return getMinLocalDeletionTime() != Integer.MAX_VALUE;
+ }
+
public int getMinTTL()
{
return sstableMetadata.minTTL;
@@ -2072,6 +2081,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
}
+ public EncodingStats stats()
+ {
+ // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
+ // SerializationHeader.make() for details) so we use the latter instead.
+ return new EncodingStats(getMinTimestamp(), getMinLocalDeletionTime(), getMinTTL());
+ }
+
public Ref<SSTableReader> tryRef()
{
return selfRef.tryRef();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/utils/IMergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IMergeIterator.java b/src/java/org/apache/cassandra/utils/IMergeIterator.java
index deddc4c..e45b897 100644
--- a/src/java/org/apache/cassandra/utils/IMergeIterator.java
+++ b/src/java/org/apache/cassandra/utils/IMergeIterator.java
@@ -21,5 +21,6 @@ import java.util.Iterator;
public interface IMergeIterator<In, Out> extends CloseableIterator<Out>
{
+
Iterable<? extends Iterator<In>> iterators();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java b/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java
new file mode 100644
index 0000000..85eeede
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+public interface IteratorWithLowerBound<In>
+{
+ In lowerBound();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/utils/MergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MergeIterator.java b/src/java/org/apache/cassandra/utils/MergeIterator.java
index 70daad9..c9e445b 100644
--- a/src/java/org/apache/cassandra/utils/MergeIterator.java
+++ b/src/java/org/apache/cassandra/utils/MergeIterator.java
@@ -200,7 +200,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
reducer.onKeyChange();
assert !heap[0].equalParent;
- reducer.reduce(heap[0].idx, heap[0].consume());
+ heap[0].consume(reducer);
final int size = this.size;
final int sortedSectionSize = Math.min(size, SORTED_SECTION_SIZE);
int i;
@@ -209,7 +209,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
{
if (!heap[i].equalParent)
break consume;
- reducer.reduce(heap[i].idx, heap[i].consume());
+ heap[i].consume(reducer);
}
i = Math.max(i, consumeHeap(i) + 1);
}
@@ -227,7 +227,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
if (idx >= size || !heap[idx].equalParent)
return -1;
- reducer.reduce(heap[idx].idx, heap[idx].consume());
+ heap[idx].consume(reducer);
int nextIdx = (idx << 1) - (SORTED_SECTION_SIZE - 1);
return Math.max(idx, Math.max(consumeHeap(nextIdx), consumeHeap(nextIdx + 1)));
}
@@ -351,6 +351,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
private final Comparator<? super In> comp;
private final int idx;
private In item;
+ private In lowerBound;
boolean equalParent;
public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp)
@@ -358,29 +359,55 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
this.iter = iter;
this.comp = comp;
this.idx = idx;
+ this.lowerBound = iter instanceof IteratorWithLowerBound ? ((IteratorWithLowerBound<In>)iter).lowerBound() : null;
}
/** @return this if our iterator had an item, and it is now available, otherwise null */
protected Candidate<In> advance()
{
+ if (lowerBound != null)
+ {
+ item = lowerBound;
+ return this;
+ }
+
if (!iter.hasNext())
return null;
+
item = iter.next();
return this;
}
public int compareTo(Candidate<In> that)
{
- assert item != null && that.item != null;
- return comp.compare(this.item, that.item);
+ assert this.item != null && that.item != null;
+ int ret = comp.compare(this.item, that.item);
+ if (ret == 0 && (this.isLowerBound() ^ that.isLowerBound()))
+ { // if the items are equal and one of them is a lower bound (but not the other one)
+ // then ensure the lower bound is less than the real item so we can safely
+ // skip lower bounds when consuming
+ return this.isLowerBound() ? -1 : 1;
+ }
+ return ret;
+ }
+
+ private boolean isLowerBound()
+ {
+ return item == lowerBound;
}
- public In consume()
+ public void consume(Reducer reducer)
{
- In temp = item;
- item = null;
- assert temp != null;
- return temp;
+ if (isLowerBound())
+ {
+ item = null;
+ lowerBound = null;
+ }
+ else
+ {
+ reducer.reduce(idx, item);
+ item = null;
+ }
}
public boolean needsAdvance()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 71bc238..3c9cbbd 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -367,6 +367,12 @@ public abstract class CQLTester
: Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable);
}
+ public void flush(boolean forceFlush)
+ {
+ if (forceFlush)
+ flush();
+ }
+
public void flush()
{
ColumnFamilyStore store = getCurrentColumnFamilyStore();
@@ -374,6 +380,12 @@ public abstract class CQLTester
store.forceBlockingFlush();
}
+ public void disableCompaction()
+ {
+ ColumnFamilyStore store = getCurrentColumnFamilyStore();
+ store.disableAutoCompaction();
+ }
+
public void compact()
{
try
@@ -809,8 +821,17 @@ public abstract class CQLTester
{
while (iter.hasNext())
{
- iter.next();
+ UntypedResultSet.Row actual = iter.next();
i++;
+
+ StringBuilder str = new StringBuilder();
+ for (int j = 0; j < meta.size(); j++)
+ {
+ ColumnSpecification column = meta.get(j);
+ ByteBuffer actualValue = actual.getBytes(column.name.toString());
+ str.append(String.format("%s=%s ", column.name, formatValue(actualValue, column.type)));
+ }
+ logger.info("Extra row num {}: {}", i, str.toString());
}
Assert.fail(String.format("Got more rows than expected. Expected %d but got %d.", rows.length, i));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java
index cef6f1f..db7487e 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java
@@ -37,9 +37,16 @@ public class StaticColumnsTest extends CQLTester
@Test
public void testStaticColumns() throws Throwable
{
+ testStaticColumns(false);
+ testStaticColumns(true);
+ }
+
+ private void testStaticColumns(boolean forceFlush) throws Throwable
+ {
createTable("CREATE TABLE %s ( k int, p int, s int static, v int, PRIMARY KEY (k, p))");
execute("INSERT INTO %s(k, s) VALUES (0, 42)");
+ flush(forceFlush);
assertRows(execute("SELECT * FROM %s"), row(0, null, 42, null));
@@ -51,6 +58,7 @@ public class StaticColumnsTest extends CQLTester
execute("INSERT INTO %s (k, p, s, v) VALUES (0, 0, 12, 0)");
execute("INSERT INTO %s (k, p, s, v) VALUES (0, 1, 24, 1)");
+ flush(forceFlush);
// Check the static columns in indeed "static"
assertRows(execute("SELECT * FROM %s"), row(0, 0, 24, 0), row(0, 1, 24, 1));
@@ -81,10 +89,12 @@ public class StaticColumnsTest extends CQLTester
// Check that deleting a row don't implicitely deletes statics
execute("DELETE FROM %s WHERE k=0 AND p=0");
+ flush(forceFlush);
assertRows(execute("SELECT * FROM %s"),row(0, 1, 24, 1));
// But that explicitely deleting the static column does remove it
execute("DELETE s FROM %s WHERE k=0");
+ flush(forceFlush);
assertRows(execute("SELECT * FROM %s"), row(0, 1, null, 1));
// Check we can add a static column ...
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
new file mode 100644
index 0000000..720108a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
@@ -0,0 +1,455 @@
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.metrics.ClearableHistogram;
+
+/**
+ * Tests for checking how many sstables we access during cql queries with LIMIT specified,
+ * see CASSANDRA-8180.
+ */
+public class SSTablesIteratedTest extends CQLTester
+{
+ private void executeAndCheck(String query, int numSSTables, Object[]... rows) throws Throwable
+ {
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+ ((ClearableHistogram) cfs.metric.sstablesPerReadHistogram.cf).clear(); // resets counts
+
+ assertRows(execute(query), rows);
+
+ assertEquals(numSSTables, cfs.metric.sstablesPerReadHistogram.cf.getSnapshot().getMax()); // max sstables read
+ }
+
+ @Override
+ protected String createTable(String query)
+ {
+ String ret = super.createTable(query);
+ disableCompaction();
+ return ret;
+ }
+
+ @Test
+ public void testSSTablesOnlyASC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+ flush();
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 10, "10"), row(1, 20, "20"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 10, "10"));
+ }
+
+ @Test
+ public void testMixedMemtableSStablesASC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 0, row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 10, "10"), row(1, 20, "20"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 0, row(1, 10, "10"));
+ }
+
+ @Test
+ public void testOverlappingSStablesASC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+ flush();
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 10, "10"), row(1, 20, "20"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 10, "10"));
+ }
+
+ @Test
+ public void testSSTablesOnlyDESC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+ flush();
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 30, "30"), row(1, 20, "20"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 30, "30"));
+ }
+
+ @Test
+ public void testMixedMemtableSStablesDESC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 0, row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 30, "30"), row(1, 20, "20"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 0, row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 0, row(1, 30, "30"));
+ }
+
+ @Test
+ public void testOverlappingSStablesDESC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+ flush();
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 30, "30"), row(1, 20, "20"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 30, "30"));
+ }
+
+ @Test
+ public void testDeletionOnDifferentSSTables() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+ flush();
+
+ execute("DELETE FROM %s WHERE id=1 and col=30");
+ flush();
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 3, row(1, 20, "20"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 4, row(1, 20, "20"), row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 4, row(1, 20, "20"), row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 4, row(1, 20, "20"), row(1, 10, "10"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 2);
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 3, row(1, 20, "20"));
+ }
+
+ @Test
+ public void testDeletionOnSameSSTable() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+ execute("DELETE FROM %s WHERE id=1 and col=30");
+ flush();
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 2, row(1, 20, "20"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 3, row(1, 20, "20"), row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 20, "20"), row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 20, "20"), row(1, 10, "10"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1);
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 2, row(1, 20, "20"));
+ }
+
+ @Test
+ public void testDeletionOnMemTable() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+ execute("DELETE FROM %s WHERE id=1 and col=30");
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 20, "20"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 20, "20"), row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 20, "20"), row(1, 10, "10"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 20, "20"), row(1, 10, "10"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 0);
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 20, "20"));
+ }
+
+ @Test
+ public void testDeletionOnIndexedSSTableDESC() throws Throwable
+ {
+ testDeletionOnIndexedSSTableDESC(true);
+ testDeletionOnIndexedSSTableDESC(false);
+ }
+
+ private void testDeletionOnIndexedSSTableDESC(boolean deleteWithRange) throws Throwable
+ {
+ // reduce the column index size so that columns get indexed during flush
+ DatabaseDescriptor.setColumnIndexSize(1);
+
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+ for (int i = 1; i <= 1000; i++)
+ {
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+ }
+ flush();
+
+ Object[][] allRows = new Object[1000][];
+ for (int i = 1001; i <= 2000; i++)
+ {
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+ allRows[2000 - i] = row(1, i, Integer.toString(i));
+ }
+
+ if (deleteWithRange)
+ {
+ execute("DELETE FROM %s WHERE id=1 and col <= ?", 1000);
+ }
+ else
+ {
+ for (int i = 1; i <= 1000; i++)
+ execute("DELETE FROM %s WHERE id=1 and col = ?", i);
+ }
+ flush();
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 2000, "2000"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 2000, "2000"), row(1, 1999, "1999"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 2, allRows);
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 1, row(1, 2000, "2000"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 1, row(1, 2000, "2000"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000", 1, allRows);
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000", 2, allRows);
+ }
+
+ @Test
+ public void testDeletionOnIndexedSSTableASC() throws Throwable
+ {
+ testDeletionOnIndexedSSTableASC(true);
+ testDeletionOnIndexedSSTableASC(false);
+ }
+
+ private void testDeletionOnIndexedSSTableASC(boolean deleteWithRange) throws Throwable
+ {
+ // reduce the column index size so that columns get indexed during flush
+ DatabaseDescriptor.setColumnIndexSize(1);
+
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
+
+ for (int i = 1; i <= 1000; i++)
+ {
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+ }
+ flush();
+
+ Object[][] allRows = new Object[1000][];
+ for (int i = 1001; i <= 2000; i++)
+ {
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+ allRows[i - 1001] = row(1, i, Integer.toString(i));
+ }
+ flush();
+
+ if (deleteWithRange)
+ {
+ execute("DELETE FROM %s WHERE id =1 and col <= ?", 1000);
+ }
+ else
+ {
+ for (int i = 1; i <= 1000; i++)
+ execute("DELETE FROM %s WHERE id=1 and col = ?", i);
+ }
+ flush();
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 3, row(1, 1001, "1001"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 3, row(1, 1001, "1001"), row(1, 1002, "1002"));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 3, allRows);
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 2, row(1, 1001, "1001"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 3, row(1, 1001, "1001"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000", 2, allRows);
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000", 3, allRows);
+ }
+
+ @Test
+ public void testDeletionOnOverlappingIndexedSSTable() throws Throwable
+ {
+ testDeletionOnOverlappingIndexedSSTable(true);
+ testDeletionOnOverlappingIndexedSSTable(false);
+ }
+
+ private void testDeletionOnOverlappingIndexedSSTable(boolean deleteWithRange) throws Throwable
+ {
+ // reduce the column index size so that columns get indexed during flush
+ DatabaseDescriptor.setColumnIndexSize(1);
+
+ createTable("CREATE TABLE %s (id int, col int, val1 text, val2 text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
+
+ for (int i = 1; i <= 500; i++)
+ {
+ if (i % 2 == 0)
+ execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+ else
+ execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
+ }
+
+ for (int i = 1001; i <= 1500; i++)
+ {
+ if (i % 2 == 0)
+ execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+ else
+ execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
+ }
+
+ flush();
+
+ for (int i = 501; i <= 1000; i++)
+ {
+ if (i % 2 == 0)
+ execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+ else
+ execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
+ }
+
+ for (int i = 1501; i <= 2000; i++)
+ {
+ if (i % 2 == 0)
+ execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+ else
+ execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
+ }
+
+ if (deleteWithRange)
+ {
+ execute("DELETE FROM %s WHERE id=1 and col > ? and col <= ?", 250, 750);
+ }
+ else
+ {
+ for (int i = 251; i <= 750; i++)
+ execute("DELETE FROM %s WHERE id=1 and col = ?", i);
+ }
+
+ flush();
+
+ Object[][] allRows = new Object[1500][]; // non deleted rows
+ for (int i = 1; i <= 2000; i++)
+ {
+ if (i > 250 && i <= 750)
+ continue; // skip deleted records
+
+ int idx = (i <= 250 ? i - 1 : i - 501);
+
+ if (i % 2 == 0)
+ allRows[idx] = row(1, i, Integer.toString(i), null);
+ else
+ allRows[idx] = row(1, i, Integer.toString(i), Integer.toString(i));
+ }
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 2, row(1, 1, "1", "1"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 1, "1", "1"), row(1, 2, "2", null));
+
+ executeAndCheck("SELECT * FROM %s WHERE id=1", 2, allRows);
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 2, row(1, 1001, "1001", "1001"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 2, row(1, 1, "1", "1"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 500 LIMIT 1", 2, row(1, 751, "751", "751"));
+ executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 500 LIMIT 1", 2, row(1, 1, "1", "1"));
+ }
+
+ @Test
+ public void testMultiplePartitionsDESC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 10, "10");
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 10, "10");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 20, "20");
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 20, "20");
+ flush();
+
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 30, "30");
+ execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 30, "30");
+ flush();
+
+ for (int i = 1; i <= 3; i++)
+ {
+ String base = "SELECT * FROM %s ";
+
+ executeAndCheck(base + String.format("WHERE id=%d LIMIT 1", i), 1, row(i, 30, "30"));
+ executeAndCheck(base + String.format("WHERE id=%d LIMIT 2", i), 2, row(i, 30, "30"), row(i, 20, "20"));
+ executeAndCheck(base + String.format("WHERE id=%d LIMIT 3", i), 3, row(i, 30, "30"), row(i, 20, "20"), row(i, 10, "10"));
+ executeAndCheck(base + String.format("WHERE id=%d", i), 3, row(i, 30, "30"), row(i, 20, "20"), row(i, 10, "10"));
+
+ executeAndCheck(base + String.format("WHERE id=%d AND col > 25 LIMIT 1", i), 1, row(i, 30, "30"));
+ executeAndCheck(base + String.format("WHERE id=%d AND col < 40 LIMIT 1", i), 1, row(i, 30, "30"));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
index da0bc33..170f85f 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
@@ -326,6 +326,27 @@ public class DeleteTest extends CQLTester
assertEmpty(execute("select * from %s where a=1 and b=1"));
}
+
+ /** Test that two deleted rows for the same partition but on different sstables do not resurface */
+ @Test
+ public void testDeletedRowsDoNotResurface() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int, b int, c text, primary key (a, b))");
+ execute("INSERT INTO %s (a, b, c) VALUES(1, 1, '1')");
+ execute("INSERT INTO %s (a, b, c) VALUES(1, 2, '2')");
+ execute("INSERT INTO %s (a, b, c) VALUES(1, 3, '3')");
+ flush();
+
+ execute("DELETE FROM %s where a=1 and b = 1");
+ flush();
+
+ execute("DELETE FROM %s where a=1 and b = 2");
+ flush();
+
+ assertRows(execute("SELECT * FROM %s WHERE a = ?", 1),
+ row(1, 3, "3"));
+ }
+
@Test
public void testDeleteWithNoClusteringColumns() throws Throwable
{
@@ -624,6 +645,39 @@ public class DeleteTest extends CQLTester
}
@Test
+ public void testDeleteWithNonoverlappingRange() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int, b int, c text, primary key (a, b))");
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s (a, b, c) VALUES(1, ?, 'abc')", i);
+ flush();
+
+ execute("DELETE FROM %s WHERE a=1 and b <= 3");
+ flush();
+
+ // this query does not overlap the tombstone range above and caused the rows to be resurrected
+ assertEmpty(execute("SELECT * FROM %s WHERE a=1 and b <= 2"));
+ }
+
+ @Test
+ public void testDeleteWithIntermediateRangeAndOneClusteringColumn() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int, b int, c text, primary key (a, b))");
+ execute("INSERT INTO %s (a, b, c) VALUES(1, 1, '1')");
+ execute("INSERT INTO %s (a, b, c) VALUES(1, 3, '3')");
+ execute("DELETE FROM %s where a=1 and b >= 2 and b <= 3");
+ execute("INSERT INTO %s (a, b, c) VALUES(1, 2, '2')");
+ flush();
+
+ execute("DELETE FROM %s where a=1 and b >= 2 and b <= 3");
+ flush();
+
+ assertRows(execute("SELECT * FROM %s WHERE a = ?", 1),
+ row(1, 1, "1"));
+ }
+
+ @Test
public void testDeleteWithRangeAndOneClusteringColumn() throws Throwable
{
testDeleteWithRangeAndOneClusteringColumn(false);
@@ -1057,10 +1111,4 @@ public class DeleteTest extends CQLTester
compact();
assertRows(execute("SELECT * FROM %s"), row(0, null));
}
-
- private void flush(boolean forceFlush)
- {
- if (forceFlush)
- flush();
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
index 3c49989..ff98f6b 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
@@ -283,10 +283,4 @@ public class InsertTest extends CQLTester
assertInvalidMessage("Some clustering keys are missing: clustering_1",
"INSERT INTO %s (partitionKey, clustering_2, staticValue) VALUES (0, 0, 'A')");
}
-
- private void flush(boolean forceFlush)
- {
- if (forceFlush)
- flush();
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java
index b939b7f..2df6fd6 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java
@@ -524,10 +524,4 @@ public class UpdateTest extends CQLTester
assertRows(execute("SELECT l FROM %s WHERE k = 0"), row(list("v1", "v4", "v3")));
}
-
- private void flush(boolean forceFlush)
- {
- if (forceFlush)
- flush();
- }
}