You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/02/10 14:15:14 UTC

cassandra git commit: Optimize disk seek using min/max column name meta data when the LIMIT clause is used

Repository: cassandra
Updated Branches:
  refs/heads/trunk 2fe34badb -> b11fba750


Optimize disk seek using min/max column name meta data when the LIMIT clause is used

patch by Stefania Alborghetti; reviewed blambov for CASSANDRA-8180


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b11fba75
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b11fba75
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b11fba75

Branch: refs/heads/trunk
Commit: b11fba750c610de5e97acba070cc571cf0a96416
Parents: 2fe34ba
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Mar 20 08:02:39 2015 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Feb 10 13:14:56 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |   6 +
 .../db/SinglePartitionReadCommand.java          | 144 +++---
 .../columniterator/AbstractSSTableIterator.java |   5 +-
 .../LazilyInitializedUnfilteredRowIterator.java |   7 +-
 .../UnfilteredRowIteratorWithLowerBound.java    | 212 +++++++++
 .../io/sstable/format/SSTableReader.java        |  16 +
 .../apache/cassandra/utils/IMergeIterator.java  |   1 +
 .../cassandra/utils/IteratorWithLowerBound.java |  24 +
 .../apache/cassandra/utils/MergeIterator.java   |  47 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |  23 +-
 .../validation/entities/StaticColumnsTest.java  |  10 +
 .../miscellaneous/SSTablesIteratedTest.java     | 455 +++++++++++++++++++
 .../cql3/validation/operations/DeleteTest.java  |  60 ++-
 .../cql3/validation/operations/InsertTest.java  |   6 -
 .../cql3/validation/operations/UpdateTest.java  |   6 -
 16 files changed, 929 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6cd4cf5..e6067a6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.4
+ * Optimize disk seek using min/max column name meta data when the LIMIT clause is used
+   (CASSANDRA-8180)
  * Add LIKE support to CQL3 (CASSANDRA-11067)
  * Generic Java UDF types (CASSANDRA-10819)
  * cqlsh: Include sub-second precision in timestamps by default (CASSANDRA-10428)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index b09605f..5c2a5c9 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -885,6 +885,12 @@ public class DatabaseDescriptor
         return conf.column_index_size_in_kb * 1024;
     }
 
+    @VisibleForTesting
+    public static void setColumnIndexSize(int val)
+    {
+        conf.column_index_size_in_kb = val;
+    }
+
     public static int getBatchSizeWarnThreshold()
     {
         return conf.batch_size_warn_threshold_in_kb * 1024;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 680b4b5..1a0b400 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -54,7 +55,6 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.HeapAllocator;
 
 
 /**
@@ -487,9 +487,9 @@ public class SinglePartitionReadCommand extends ReadCommand
 
         Tracing.trace("Acquiring sstable references");
         ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
-
         List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
         ClusteringIndexFilter filter = clusteringIndexFilter();
+        long minTimestamp = Long.MAX_VALUE;
 
         try
         {
@@ -499,11 +499,14 @@ public class SinglePartitionReadCommand extends ReadCommand
                 if (partition == null)
                     continue;
 
+                minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
                 @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
                 UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
                 oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
                 iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
             }
+
             /*
              * We can't eliminate full sstables based on the timestamp of what we've already read like
              * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
@@ -516,16 +519,13 @@ public class SinglePartitionReadCommand extends ReadCommand
              * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
              * in one pass, and minimize the number of sstables for which we read a partition tombstone.
              */
-            int sstablesIterated = 0;
             Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-            List<SSTableReader> skippedSSTables = null;
             long mostRecentPartitionTombstone = Long.MIN_VALUE;
-            long minTimestamp = Long.MAX_VALUE;
             int nonIntersectingSSTables = 0;
+            List<SSTableReader> skippedSSTablesWithTombstones = null;
 
             for (SSTableReader sstable : view.sstables)
             {
-                minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
                 // if we've already seen a partition tombstone with a timestamp greater
                 // than the most recent update to this sstable, we can skip it
                 if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
@@ -534,73 +534,55 @@ public class SinglePartitionReadCommand extends ReadCommand
                 if (!shouldInclude(sstable))
                 {
                     nonIntersectingSSTables++;
-                    // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
-                    if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
-                    {
-                        if (skippedSSTables == null)
-                            skippedSSTables = new ArrayList<>();
-                        skippedSSTables.add(sstable);
+                    if (sstable.hasTombstones())
+                    { // if sstable has tombstones we need to check after one pass if it can be safely skipped
+                        if (skippedSSTablesWithTombstones == null)
+                            skippedSSTablesWithTombstones = new ArrayList<>();
+                        skippedSSTablesWithTombstones.add(sstable);
                     }
                     continue;
                 }
 
-                sstable.incrementReadCount();
-                @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
-                UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift());
+                minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
+
+                @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception,
+                                              // or through the closing of the final merged iterator
+                UnfilteredRowIteratorWithLowerBound iter = makeIterator(sstable, true);
                 if (!sstable.isRepaired())
                     oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 
-                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
-                mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
-                sstablesIterated++;
+                iterators.add(iter);
+                mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+                                                        iter.partitionLevelDeletion().markedForDeleteAt());
             }
 
             int includedDueToTombstones = 0;
-            // Check for partition tombstones in the skipped sstables
-            if (skippedSSTables != null)
+            // Check for sstables with tombstones that are not expired
+            if (skippedSSTablesWithTombstones != null)
             {
-                for (SSTableReader sstable : skippedSSTables)
+                for (SSTableReader sstable : skippedSSTablesWithTombstones)
                 {
                     if (sstable.getMaxTimestamp() <= minTimestamp)
                         continue;
 
-                    sstable.incrementReadCount();
-                    @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
-                    UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift());
-                    if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
-                    {
-                        iterators.add(iter);
-                        if (!sstable.isRepaired())
-                            oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
-                        includedDueToTombstones++;
-                        sstablesIterated++;
-                    }
-                    else
-                    {
-                        iter.close();
-                    }
+                    @SuppressWarnings("resource") // 'iter' is added to iterators which is close on exception,
+                                                  // or through the closing of the final merged iterator
+                    UnfilteredRowIteratorWithLowerBound iter = makeIterator(sstable, false);
+                    if (!sstable.isRepaired())
+                        oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+
+                    iterators.add(iter);
+                    includedDueToTombstones++;
                 }
             }
             if (Tracing.isTracing())
                 Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
-                              nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
-
-            cfs.metric.updateSSTableIterated(sstablesIterated);
+                               nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
 
             if (iterators.isEmpty())
                 return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed());
 
-            Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
-
-            @SuppressWarnings("resource") //  Closed through the closing of the result of that method.
-            UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
-            if (!merged.isEmpty())
-            {
-                DecoratedKey key = merged.partitionKey();
-                cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
-            }
-
-            return withStateTracking(merged);
+            return withStateTracking(withSSTablesIterated(iterators, cfs.metric));
         }
         catch (RuntimeException | Error e)
         {
@@ -627,6 +609,50 @@ public class SinglePartitionReadCommand extends ReadCommand
         return clusteringIndexFilter().shouldInclude(sstable);
     }
 
+    private UnfilteredRowIteratorWithLowerBound makeIterator(final SSTableReader sstable, boolean applyThriftTransformation)
+    {
+        return new UnfilteredRowIteratorWithLowerBound(partitionKey(),
+                                                       sstable,
+                                                       clusteringIndexFilter(),
+                                                       columnFilter(),
+                                                       isForThrift(),
+                                                       nowInSec(),
+                                                       applyThriftTransformation);
+    }
+
+    /**
+     * Return a wrapped iterator that when closed will update the sstables iterated and READ sample metrics.
+     * Note that we cannot use the Transformations framework because they greedily get the static row, which
+     * would cause all iterators to be initialized and hence all sstables to be accessed.
+     */
+    private UnfilteredRowIterator withSSTablesIterated(List<UnfilteredRowIterator> iterators,
+                                                       TableMetrics metrics)
+    {
+        @SuppressWarnings("resource") //  Closed through the closing of the result of the caller method.
+        UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
+
+        if (!merged.isEmpty())
+        {
+            DecoratedKey key = merged.partitionKey();
+            metrics.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+        }
+
+        class UpdateSstablesIterated extends Transformation
+        {
+           public void onPartitionClose()
+           {
+               int sstablesIterated = (int)iterators.stream()
+                                                    .filter(it -> it instanceof LazilyInitializedUnfilteredRowIterator)
+                                                    .filter(it -> ((LazilyInitializedUnfilteredRowIterator)it).initialized())
+                                                    .count();
+
+               metrics.updateSSTableIterated(sstablesIterated);
+               Tracing.trace("Merged data from memtables and {} sstables", sstablesIterated);
+           }
+        };
+        return Transformation.apply(merged, new UpdateSstablesIterated());
+    }
+
     private boolean queryNeitherCountersNorCollections()
     {
         for (ColumnDefinition column : columnFilter().fetchedColumns())
@@ -693,8 +719,8 @@ public class SinglePartitionReadCommand extends ReadCommand
                 // however: if it is set, it impacts everything and must be included. Getting that top-level partition deletion costs us
                 // some seek in general however (unless the partition is indexed and is in the key cache), so we first check if the sstable
                 // has any tombstone at all as a shortcut.
-                if (sstable.getSSTableMetadata().maxLocalDeletionTime == Integer.MAX_VALUE)
-                    continue; // Means no tombstone at all, we can skip that sstable
+                if (!sstable.hasTombstones())
+                    continue; // no tombstone at all, we can skip that sstable
 
                 // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable.
                 sstable.incrementReadCount();
@@ -711,7 +737,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
             Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
             sstable.incrementReadCount();
-            try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift());)
+            try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift()))
             {
                 if (iter.isEmpty())
                     continue;
@@ -741,13 +767,9 @@ public class SinglePartitionReadCommand extends ReadCommand
             try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
             {
                 final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter, columnFilter()));
-                StageManager.getStage(Stage.MUTATION).execute(new Runnable()
-                {
-                    public void run()
-                    {
-                        // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
-                        Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
-                    }
+                StageManager.getStage(Stage.MUTATION).execute(() -> {
+                    // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
+                    Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
                 });
             }
         }
@@ -909,7 +931,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
         public static Group one(SinglePartitionReadCommand command)
         {
-            return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits());
+            return new Group(Collections.singletonList(command), command.limits());
         }
 
         public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 792f5ad..d55161b 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.columniterator;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -217,9 +216,7 @@ abstract class AbstractSSTableIterator implements UnfilteredRowIterator
 
     public EncodingStats stats()
     {
-        // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
-        // SerializationHeader.make() for details) so we use the latter instead.
-        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
+        return sstable.stats();
     }
 
     public boolean hasNext()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
index 1bf78dd..fc5bdbe 100644
--- a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
@@ -42,12 +42,17 @@ public abstract class LazilyInitializedUnfilteredRowIterator extends AbstractIte
 
     protected abstract UnfilteredRowIterator initializeIterator();
 
-    private void maybeInit()
+    protected void maybeInit()
     {
         if (iterator == null)
             iterator = initializeIterator();
     }
 
+    public boolean initialized()
+    {
+        return iterator != null;
+    }
+
     public CFMetaData metadata()
     {
         maybeInit();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
new file mode 100644
index 0000000..4f55677
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
@@ -0,0 +1,212 @@
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.utils.IteratorWithLowerBound;
+
+/**
+ * An unfiltered row iterator with a lower bound retrieved from either the global
+ * sstable statistics or the row index lower bounds (if available in the cache).
+ * Before initializing the sstable unfiltered row iterator, we return an empty row
+ * with the clustering set to the lower bound. The empty row will be filtered out and
+ * the result is that if we don't need to access this sstable, i.e. due to the LIMIT conditon,
+ * then we will not. See CASSANDRA-8180 for examples of why this is useful.
+ */
+public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilteredRowIterator implements IteratorWithLowerBound<Unfiltered>
+{
+    private final SSTableReader sstable;
+    private final ClusteringIndexFilter filter;
+    private final ColumnFilter selectedColumns;
+    private final boolean isForThrift;
+    private final int nowInSec;
+    private final boolean applyThriftTransformation;
+    private RangeTombstone.Bound lowerBound;
+    private boolean firstItemRetrieved;
+
+    public UnfilteredRowIteratorWithLowerBound(DecoratedKey partitionKey,
+                                               SSTableReader sstable,
+                                               ClusteringIndexFilter filter,
+                                               ColumnFilter selectedColumns,
+                                               boolean isForThrift,
+                                               int nowInSec,
+                                               boolean applyThriftTransformation)
+    {
+        super(partitionKey);
+        this.sstable = sstable;
+        this.filter = filter;
+        this.selectedColumns = selectedColumns;
+        this.isForThrift = isForThrift;
+        this.nowInSec = nowInSec;
+        this.applyThriftTransformation = applyThriftTransformation;
+        this.lowerBound = null;
+        this.firstItemRetrieved = false;
+    }
+
+    public Unfiltered lowerBound()
+    {
+        if (lowerBound != null)
+            return makeBound(lowerBound);
+
+        // The partition index lower bound is more accurate than the sstable metadata lower bound but it is only
+        // present if the iterator has already been initialized, which we only do when there are tombstones since in
+        // this case we cannot use the sstable metadata clustering values
+        RangeTombstone.Bound ret = getPartitionIndexLowerBound();
+        return ret != null ? makeBound(ret) : makeBound(getMetadataLowerBound());
+    }
+
+    private Unfiltered makeBound(RangeTombstone.Bound bound)
+    {
+        if (bound == null)
+            return null;
+
+        if (lowerBound != bound)
+            lowerBound = bound;
+
+        return new RangeTombstoneBoundMarker(lowerBound, DeletionTime.LIVE);
+    }
+
+    @Override
+    protected UnfilteredRowIterator initializeIterator()
+    {
+        sstable.incrementReadCount();
+
+        @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+        UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed(), isForThrift);
+        return isForThrift && applyThriftTransformation
+               ? ThriftResultsMerger.maybeWrap(iter, nowInSec)
+               : iter;
+    }
+
+    @Override
+    protected Unfiltered computeNext()
+    {
+        Unfiltered ret = super.computeNext();
+        if (firstItemRetrieved)
+            return ret;
+
+        // Check that the lower bound is not bigger than the first item retrieved
+        firstItemRetrieved = true;
+        if (lowerBound != null && ret != null)
+            assert comparator().compare(lowerBound, ret.clustering()) <= 0
+                : String.format("Lower bound [%s ]is bigger than first returned value [%s] for sstable %s",
+                                lowerBound.toString(sstable.metadata),
+                                ret.toString(sstable.metadata),
+                                sstable.getFilename());
+
+        return ret;
+    }
+
+    private Comparator<Clusterable> comparator()
+    {
+        return filter.isReversed() ? sstable.metadata.comparator.reversed() : sstable.metadata.comparator;
+    }
+
+    @Override
+    public CFMetaData metadata()
+    {
+        return sstable.metadata;
+    }
+
+    @Override
+    public boolean isReverseOrder()
+    {
+        return filter.isReversed();
+    }
+
+    @Override
+    public PartitionColumns columns()
+    {
+        return selectedColumns.fetchedColumns();
+    }
+
+    @Override
+    public EncodingStats stats()
+    {
+        return sstable.stats();
+    }
+
+    @Override
+    public DeletionTime partitionLevelDeletion()
+    {
+        if (!sstable.hasTombstones())
+            return DeletionTime.LIVE;
+
+        return super.partitionLevelDeletion();
+    }
+
+    @Override
+    public Row staticRow()
+    {
+        if (columns().statics.isEmpty())
+            return Rows.EMPTY_STATIC_ROW;
+
+        return super.staticRow();
+    }
+
+    /**
+     * @return the lower bound stored on the index entry for this partition, if available.
+     */
+    private RangeTombstone.Bound getPartitionIndexLowerBound()
+    {
+        // Creating the iterator ensures that rowIndexEntry is loaded if available (partitions bigger than
+        // DatabaseDescriptor.column_index_size_in_kb)
+        if (!canUseMetadataLowerBound())
+            maybeInit();
+
+        RowIndexEntry rowIndexEntry = sstable.getCachedPosition(partitionKey(), false);
+        if (rowIndexEntry == null)
+            return null;
+
+        List<IndexHelper.IndexInfo> columns = rowIndexEntry.columnsIndex();
+        if (columns.size() == 0)
+            return null;
+
+        IndexHelper.IndexInfo column = columns.get(filter.isReversed() ? columns.size() - 1 : 0);
+        ClusteringPrefix lowerBoundPrefix = filter.isReversed() ? column.lastName : column.firstName;
+        assert lowerBoundPrefix.getRawValues().length <= sstable.metadata.comparator.size() :
+            String.format("Unexpected number of clustering values %d, expected %d or fewer for %s",
+                          lowerBoundPrefix.getRawValues().length,
+                          sstable.metadata.comparator.size(),
+                          sstable.getFilename());
+        return RangeTombstone.Bound.inclusiveOpen(filter.isReversed(), lowerBoundPrefix.getRawValues());
+    }
+
+    /**
+     * @return true if we can use the clustering values in the stats of the sstable:
+     * - we need the latest stats file format (or else the clustering values create clusterings with the wrong size)
+     * - we cannot create tombstone bounds from these values only and so we rule out sstables with tombstones
+     */
+    private boolean canUseMetadataLowerBound()
+    {
+        return !sstable.hasTombstones() && sstable.descriptor.version.hasNewStatsFile();
+    }
+
+    /**
+     * @return a global lower bound made from the clustering values stored in the sstable metadata, note that
+     * this currently does not correctly compare tombstone bounds, especially ranges.
+     */
+    private RangeTombstone.Bound getMetadataLowerBound()
+    {
+        if (!canUseMetadataLowerBound())
+            return null;
+
+        final StatsMetadata m = sstable.getSSTableMetadata();
+        List<ByteBuffer> vals = filter.isReversed() ? m.maxClusteringValues : m.minClusteringValues;
+        assert vals.size() <= sstable.metadata.comparator.size() :
+        String.format("Unexpected number of clustering values %d, expected %d or fewer for %s",
+                      vals.size(),
+                      sstable.metadata.comparator.size(),
+                      sstable.getFilename());
+        return  RangeTombstone.Bound.inclusiveOpen(filter.isReversed(), vals.toArray(new ByteBuffer[vals.size()]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index e152540..495d831 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
@@ -1922,6 +1923,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return sstableMetadata.maxLocalDeletionTime;
     }
 
+    /** sstable contains no tombstones if maxLocalDeletionTime == Integer.MAX_VALUE */
+    public boolean hasTombstones()
+    {
+        // sstable contains no tombstone if minLocalDeletionTime is still set to  the default value Integer.MAX_VALUE
+        // which is bigger than any valid deletion times
+        return getMinLocalDeletionTime() != Integer.MAX_VALUE;
+    }
+
     public int getMinTTL()
     {
         return sstableMetadata.minTTL;
@@ -2072,6 +2081,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         }
     }
 
+    public EncodingStats stats()
+    {
+        // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
+        // SerializationHeader.make() for details) so we use the latter instead.
+        return new EncodingStats(getMinTimestamp(), getMinLocalDeletionTime(), getMinTTL());
+    }
+
     public Ref<SSTableReader> tryRef()
     {
         return selfRef.tryRef();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/utils/IMergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IMergeIterator.java b/src/java/org/apache/cassandra/utils/IMergeIterator.java
index deddc4c..e45b897 100644
--- a/src/java/org/apache/cassandra/utils/IMergeIterator.java
+++ b/src/java/org/apache/cassandra/utils/IMergeIterator.java
@@ -21,5 +21,6 @@ import java.util.Iterator;
 
 public interface IMergeIterator<In, Out> extends CloseableIterator<Out>
 {
+
     Iterable<? extends Iterator<In>> iterators();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java b/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java
new file mode 100644
index 0000000..85eeede
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+public interface IteratorWithLowerBound<In>
+{
+    In lowerBound();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/utils/MergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MergeIterator.java b/src/java/org/apache/cassandra/utils/MergeIterator.java
index 70daad9..c9e445b 100644
--- a/src/java/org/apache/cassandra/utils/MergeIterator.java
+++ b/src/java/org/apache/cassandra/utils/MergeIterator.java
@@ -200,7 +200,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
 
             reducer.onKeyChange();
             assert !heap[0].equalParent;
-            reducer.reduce(heap[0].idx, heap[0].consume());
+            heap[0].consume(reducer);
             final int size = this.size;
             final int sortedSectionSize = Math.min(size, SORTED_SECTION_SIZE);
             int i;
@@ -209,7 +209,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
                 {
                     if (!heap[i].equalParent)
                         break consume;
-                    reducer.reduce(heap[i].idx, heap[i].consume());
+                    heap[i].consume(reducer);
                 }
                 i = Math.max(i, consumeHeap(i) + 1);
             }
@@ -227,7 +227,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
             if (idx >= size || !heap[idx].equalParent)
                 return -1;
 
-            reducer.reduce(heap[idx].idx, heap[idx].consume());
+            heap[idx].consume(reducer);
             int nextIdx = (idx << 1) - (SORTED_SECTION_SIZE - 1);
             return Math.max(idx, Math.max(consumeHeap(nextIdx), consumeHeap(nextIdx + 1)));
         }
@@ -351,6 +351,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
         private final Comparator<? super In> comp;
         private final int idx;
         private In item;
+        private In lowerBound;
         boolean equalParent;
 
         public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp)
@@ -358,29 +359,55 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
             this.iter = iter;
             this.comp = comp;
             this.idx = idx;
+            this.lowerBound = iter instanceof IteratorWithLowerBound ? ((IteratorWithLowerBound<In>)iter).lowerBound() : null;
         }
 
         /** @return this if our iterator had an item, and it is now available, otherwise null */
         protected Candidate<In> advance()
         {
+            if (lowerBound != null)
+            {
+                item = lowerBound;
+                return this;
+            }
+
             if (!iter.hasNext())
                 return null;
+
             item = iter.next();
             return this;
         }
 
         public int compareTo(Candidate<In> that)
         {
-            assert item != null && that.item != null;
-            return comp.compare(this.item, that.item);
+            assert this.item != null && that.item != null;
+            int ret = comp.compare(this.item, that.item);
+            if (ret == 0 && (this.isLowerBound() ^ that.isLowerBound()))
+            {   // if the items are equal and one of them is a lower bound (but not the other one)
+                // then ensure the lower bound is less than the real item so we can safely
+                // skip lower bounds when consuming
+                return this.isLowerBound() ? -1 : 1;
+            }
+            return ret;
+        }
+
+        private boolean isLowerBound()
+        {
+            return item == lowerBound;
         }
 
-        public In consume()
+        public void consume(Reducer reducer)
         {
-            In temp = item;
-            item = null;
-            assert temp != null;
-            return temp;
+            if (isLowerBound())
+            {
+                item = null;
+                lowerBound = null;
+            }
+            else
+            {
+                reducer.reduce(idx, item);
+                item = null;
+            }
         }
 
         public boolean needsAdvance()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 71bc238..3c9cbbd 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -367,6 +367,12 @@ public abstract class CQLTester
              : Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable);
     }
 
+    public void flush(boolean forceFlush)
+    {
+        if (forceFlush)
+            flush();
+    }
+
     public void flush()
     {
         ColumnFamilyStore store = getCurrentColumnFamilyStore();
@@ -374,6 +380,12 @@ public abstract class CQLTester
             store.forceBlockingFlush();
     }
 
+    public void disableCompaction()
+    {
+        ColumnFamilyStore store = getCurrentColumnFamilyStore();
+        store.disableAutoCompaction();
+    }
+
     public void compact()
     {
         try
@@ -809,8 +821,17 @@ public abstract class CQLTester
         {
             while (iter.hasNext())
             {
-                iter.next();
+                UntypedResultSet.Row actual = iter.next();
                 i++;
+
+                StringBuilder str = new StringBuilder();
+                for (int j = 0; j < meta.size(); j++)
+                {
+                    ColumnSpecification column = meta.get(j);
+                    ByteBuffer actualValue = actual.getBytes(column.name.toString());
+                    str.append(String.format("%s=%s ", column.name, formatValue(actualValue, column.type)));
+                }
+                logger.info("Extra row num {}: {}", i, str.toString());
             }
             Assert.fail(String.format("Got more rows than expected. Expected %d but got %d.", rows.length, i));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java
index cef6f1f..db7487e 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java
@@ -37,9 +37,16 @@ public class StaticColumnsTest extends CQLTester
     @Test
     public void testStaticColumns() throws Throwable
     {
+        testStaticColumns(false);
+        testStaticColumns(true);
+    }
+
+    private void testStaticColumns(boolean forceFlush) throws Throwable
+    {
         createTable("CREATE TABLE %s ( k int, p int, s int static, v int, PRIMARY KEY (k, p))");
 
         execute("INSERT INTO %s(k, s) VALUES (0, 42)");
+        flush(forceFlush);
 
         assertRows(execute("SELECT * FROM %s"), row(0, null, 42, null));
 
@@ -51,6 +58,7 @@ public class StaticColumnsTest extends CQLTester
 
         execute("INSERT INTO %s (k, p, s, v) VALUES (0, 0, 12, 0)");
         execute("INSERT INTO %s (k, p, s, v) VALUES (0, 1, 24, 1)");
+        flush(forceFlush);
 
         // Check the static columns in indeed "static"
         assertRows(execute("SELECT * FROM %s"), row(0, 0, 24, 0), row(0, 1, 24, 1));
@@ -81,10 +89,12 @@ public class StaticColumnsTest extends CQLTester
 
         // Check that deleting a row don't implicitely deletes statics
         execute("DELETE FROM %s WHERE k=0 AND p=0");
+        flush(forceFlush);
         assertRows(execute("SELECT * FROM %s"),row(0, 1, 24, 1));
 
         // But that explicitely deleting the static column does remove it
         execute("DELETE s FROM %s WHERE k=0");
+        flush(forceFlush);
         assertRows(execute("SELECT * FROM %s"), row(0, 1, null, 1));
 
         // Check we can add a static column ...

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
new file mode 100644
index 0000000..720108a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
@@ -0,0 +1,455 @@
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.metrics.ClearableHistogram;
+
+/**
+ * Tests for checking how many sstables we access during cql queries with LIMIT specified,
+ * see CASSANDRA-8180.
+ */
+public class SSTablesIteratedTest extends CQLTester
+{
+    private void executeAndCheck(String query, int numSSTables, Object[]... rows) throws Throwable
+    {
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        ((ClearableHistogram) cfs.metric.sstablesPerReadHistogram.cf).clear(); // resets counts
+
+        assertRows(execute(query), rows);
+
+        assertEquals(numSSTables, cfs.metric.sstablesPerReadHistogram.cf.getSnapshot().getMax()); // max sstables read
+    }
+
+    @Override
+    protected String createTable(String query)
+    {
+        String ret = super.createTable(query);
+        disableCompaction();
+        return ret;
+    }
+
+    @Test
+    public void testSSTablesOnlyASC() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+        flush();
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 10, "10"), row(1, 20, "20"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 10, "10"));
+    }
+
+    @Test
+    public void testMixedMemtableSStablesASC() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 0, row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 10, "10"), row(1, 20, "20"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 0, row(1, 10, "10"));
+    }
+
+    @Test
+    public void testOverlappingSStablesASC() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+        flush();
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 10, "10"), row(1, 20, "20"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 10, "10"));
+    }
+
+    @Test
+    public void testSSTablesOnlyDESC() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+        flush();
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 30, "30"), row(1, 20, "20"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 30, "30"));
+    }
+
+    @Test
+    public void testMixedMemtableSStablesDESC() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 0, row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 30, "30"), row(1, 20, "20"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 0, row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 0, row(1, 30, "30"));
+    }
+
+    @Test
+    public void testOverlappingSStablesDESC() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+        flush();
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 30, "30"), row(1, 20, "20"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 30, "30"));
+    }
+
+    @Test
+    public void testDeletionOnDifferentSSTables() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+        flush();
+
+        execute("DELETE FROM %s WHERE id=1 and col=30");
+        flush();
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 3, row(1, 20, "20"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 4, row(1, 20, "20"), row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 4, row(1, 20, "20"), row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 4, row(1, 20, "20"), row(1, 10, "10"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 2);
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 3, row(1, 20, "20"));
+    }
+
+    @Test
+    public void testDeletionOnSameSSTable() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+        execute("DELETE FROM %s WHERE id=1 and col=30");
+        flush();
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 2, row(1, 20, "20"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 3, row(1, 20, "20"), row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 20, "20"), row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 20, "20"), row(1, 10, "10"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1);
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 2, row(1, 20, "20"));
+    }
+
+    @Test
+    public void testDeletionOnMemTable() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+        execute("DELETE FROM %s WHERE id=1 and col=30");
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 20, "20"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 20, "20"), row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 20, "20"), row(1, 10, "10"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 20, "20"), row(1, 10, "10"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 0);
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 20, "20"));
+    }
+
+    @Test
+    public void testDeletionOnIndexedSSTableDESC() throws Throwable
+    {
+        testDeletionOnIndexedSSTableDESC(true);
+        testDeletionOnIndexedSSTableDESC(false);
+    }
+
+    private void testDeletionOnIndexedSSTableDESC(boolean deleteWithRange) throws Throwable
+    {
+        // reduce the column index size so that columns get indexed during flush
+        DatabaseDescriptor.setColumnIndexSize(1);
+
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+        for (int i = 1; i <= 1000; i++)
+        {
+            execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+        }
+        flush();
+
+        Object[][] allRows = new Object[1000][];
+        for (int i = 1001; i <= 2000; i++)
+        {
+            execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+            allRows[2000 - i] = row(1, i, Integer.toString(i));
+        }
+
+        if (deleteWithRange)
+        {
+            execute("DELETE FROM %s WHERE id=1 and col <= ?", 1000);
+        }
+        else
+        {
+            for (int i = 1; i <= 1000; i++)
+                execute("DELETE FROM %s WHERE id=1 and col = ?", i);
+        }
+        flush();
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 2000, "2000"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 2000, "2000"), row(1, 1999, "1999"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, allRows);
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 1, row(1, 2000, "2000"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 1, row(1, 2000, "2000"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000", 1, allRows);
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000", 2, allRows);
+    }
+
+    @Test
+    public void testDeletionOnIndexedSSTableASC() throws Throwable
+    {
+        testDeletionOnIndexedSSTableASC(true);
+        testDeletionOnIndexedSSTableASC(false);
+    }
+
+    private void testDeletionOnIndexedSSTableASC(boolean deleteWithRange) throws Throwable
+    {
+        // reduce the column index size so that columns get indexed during flush
+        DatabaseDescriptor.setColumnIndexSize(1);
+
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
+
+        for (int i = 1; i <= 1000; i++)
+        {
+            execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+        }
+        flush();
+
+        Object[][] allRows = new Object[1000][];
+        for (int i = 1001; i <= 2000; i++)
+        {
+            execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+            allRows[i - 1001] = row(1, i, Integer.toString(i));
+        }
+        flush();
+
+        if (deleteWithRange)
+        {
+            execute("DELETE FROM %s WHERE id =1 and col <= ?", 1000);
+        }
+        else
+        {
+            for (int i = 1; i <= 1000; i++)
+                execute("DELETE FROM %s WHERE id=1 and col = ?", i);
+        }
+        flush();
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 3, row(1, 1001, "1001"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 3, row(1, 1001, "1001"), row(1, 1002, "1002"));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 3, allRows);
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 2, row(1, 1001, "1001"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 3, row(1, 1001, "1001"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000", 2, allRows);
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000", 3, allRows);
+    }
+
+    @Test
+    public void testDeletionOnOverlappingIndexedSSTable() throws Throwable
+    {
+        testDeletionOnOverlappingIndexedSSTable(true);
+        testDeletionOnOverlappingIndexedSSTable(false);
+    }
+
+    private void testDeletionOnOverlappingIndexedSSTable(boolean deleteWithRange) throws Throwable
+    {
+        // reduce the column index size so that columns get indexed during flush
+        DatabaseDescriptor.setColumnIndexSize(1);
+
+        createTable("CREATE TABLE %s (id int, col int, val1 text, val2 text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
+
+        for (int i = 1; i <= 500; i++)
+        {
+            if (i % 2 == 0)
+                execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+            else
+                execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
+        }
+
+        for (int i = 1001; i <= 1500; i++)
+        {
+            if (i % 2 == 0)
+                execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+            else
+                execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
+        }
+
+        flush();
+
+        for (int i = 501; i <= 1000; i++)
+        {
+            if (i % 2 == 0)
+                execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+            else
+                execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
+        }
+
+        for (int i = 1501; i <= 2000; i++)
+        {
+            if (i % 2 == 0)
+                execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
+            else
+                execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
+        }
+
+        if (deleteWithRange)
+        {
+            execute("DELETE FROM %s WHERE id=1 and col > ? and col <= ?", 250, 750);
+        }
+        else
+        {
+            for (int i = 251; i <= 750; i++)
+                execute("DELETE FROM %s WHERE id=1 and col = ?", i);
+        }
+
+        flush();
+
+        Object[][] allRows = new Object[1500][]; // non deleted rows
+        for (int i = 1; i <= 2000; i++)
+        {
+            if (i > 250 && i <= 750)
+                continue; // skip deleted records
+
+            int idx = (i <= 250 ? i - 1 : i - 501);
+
+            if (i % 2 == 0)
+                allRows[idx] = row(1, i, Integer.toString(i), null);
+            else
+                allRows[idx] = row(1, i, Integer.toString(i), Integer.toString(i));
+        }
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 2, row(1, 1, "1", "1"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 1, "1", "1"), row(1, 2, "2", null));
+
+        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, allRows);
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 2, row(1, 1001, "1001", "1001"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 2, row(1, 1, "1", "1"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 500 LIMIT 1", 2, row(1, 751, "751", "751"));
+        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 500 LIMIT 1", 2, row(1, 1, "1", "1"));
+    }
+
+    @Test
+    public void testMultiplePartitionsDESC() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 10, "10");
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 10, "10");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 20, "20");
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 20, "20");
+        flush();
+
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 30, "30");
+        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 30, "30");
+        flush();
+
+        for (int i = 1; i <= 3; i++)
+        {
+            String base = "SELECT * FROM %s ";
+
+            executeAndCheck(base + String.format("WHERE id=%d LIMIT 1", i), 1, row(i, 30, "30"));
+            executeAndCheck(base + String.format("WHERE id=%d LIMIT 2", i), 2, row(i, 30, "30"), row(i, 20, "20"));
+            executeAndCheck(base + String.format("WHERE id=%d LIMIT 3", i), 3, row(i, 30, "30"), row(i, 20, "20"), row(i, 10, "10"));
+            executeAndCheck(base + String.format("WHERE id=%d", i), 3, row(i, 30, "30"), row(i, 20, "20"), row(i, 10, "10"));
+
+            executeAndCheck(base + String.format("WHERE id=%d AND col > 25 LIMIT 1", i), 1, row(i, 30, "30"));
+            executeAndCheck(base + String.format("WHERE id=%d AND col < 40 LIMIT 1", i), 1, row(i, 30, "30"));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
index da0bc33..170f85f 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
@@ -326,6 +326,27 @@ public class DeleteTest extends CQLTester
 
         assertEmpty(execute("select * from %s  where a=1 and b=1"));
     }
+
+    /** Test that two deleted rows for the same partition but on different sstables do not resurface */
+    @Test
+    public void testDeletedRowsDoNotResurface() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c text, primary key (a, b))");
+        execute("INSERT INTO %s (a, b, c) VALUES(1, 1, '1')");
+        execute("INSERT INTO %s (a, b, c) VALUES(1, 2, '2')");
+        execute("INSERT INTO %s (a, b, c) VALUES(1, 3, '3')");
+        flush();
+
+        execute("DELETE FROM %s where a=1 and b = 1");
+        flush();
+
+        execute("DELETE FROM %s where a=1 and b = 2");
+        flush();
+
+        assertRows(execute("SELECT * FROM %s WHERE a = ?", 1),
+                   row(1, 3, "3"));
+    }
+
     @Test
     public void testDeleteWithNoClusteringColumns() throws Throwable
     {
@@ -624,6 +645,39 @@ public class DeleteTest extends CQLTester
     }
 
     @Test
+    public void testDeleteWithNonoverlappingRange() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c text, primary key (a, b))");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (a, b, c) VALUES(1, ?, 'abc')", i);
+        flush();
+
+        execute("DELETE FROM %s WHERE a=1 and b <= 3");
+        flush();
+
+        // this query does not overlap the tombstone range above and caused the rows to be resurrected
+        assertEmpty(execute("SELECT * FROM %s WHERE a=1 and b <= 2"));
+    }
+
+    @Test
+    public void testDeleteWithIntermediateRangeAndOneClusteringColumn() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c text, primary key (a, b))");
+        execute("INSERT INTO %s (a, b, c) VALUES(1, 1, '1')");
+        execute("INSERT INTO %s (a, b, c) VALUES(1, 3, '3')");
+        execute("DELETE FROM %s where a=1 and b >= 2 and b <= 3");
+        execute("INSERT INTO %s (a, b, c) VALUES(1, 2, '2')");
+        flush();
+
+        execute("DELETE FROM %s where a=1 and b >= 2 and b <= 3");
+        flush();
+
+        assertRows(execute("SELECT * FROM %s WHERE a = ?", 1),
+                   row(1, 1, "1"));
+    }
+
+    @Test
     public void testDeleteWithRangeAndOneClusteringColumn() throws Throwable
     {
         testDeleteWithRangeAndOneClusteringColumn(false);
@@ -1057,10 +1111,4 @@ public class DeleteTest extends CQLTester
         compact();
         assertRows(execute("SELECT * FROM %s"), row(0, null));
     }
-
-    private void flush(boolean forceFlush)
-    {
-        if (forceFlush)
-            flush();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
index 3c49989..ff98f6b 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
@@ -283,10 +283,4 @@ public class InsertTest extends CQLTester
         assertInvalidMessage("Some clustering keys are missing: clustering_1",
                              "INSERT INTO %s (partitionKey, clustering_2, staticValue) VALUES (0, 0, 'A')");
     }
-
-    private void flush(boolean forceFlush)
-    {
-        if (forceFlush)
-            flush();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java
index b939b7f..2df6fd6 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java
@@ -524,10 +524,4 @@ public class UpdateTest extends CQLTester
 
         assertRows(execute("SELECT l FROM %s WHERE k = 0"), row(list("v1", "v4", "v3")));
     }
-
-    private void flush(boolean forceFlush)
-    {
-        if (forceFlush)
-            flush();
-    }
 }