You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2017/06/01 08:24:17 UTC

[6/8] cassandra git commit: Merge branch cassandra-3.0 into cassandra-3.11

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
index 4536036,0000000..84a742b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
@@@ -1,261 -1,0 +1,263 @@@
 +/*
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + *
 + */
 +package org.apache.cassandra.db.rows;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Comparator;
 +import java.util.List;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.io.sstable.IndexInfo;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.thrift.ThriftResultsMerger;
 +import org.apache.cassandra.utils.IteratorWithLowerBound;
 +
 +/**
 + * An unfiltered row iterator with a lower bound retrieved from either the global
 + * sstable statistics or the row index lower bounds (if available in the cache).
 + * Before initializing the sstable unfiltered row iterator, we return an empty row
 + * with the clustering set to the lower bound. The empty row will be filtered out and
 + * the result is that if we don't need to access this sstable, i.e. due to the LIMIT conditon,
 + * then we will not. See CASSANDRA-8180 for examples of why this is useful.
 + */
 +public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilteredRowIterator implements IteratorWithLowerBound<Unfiltered>
 +{
 +    private final SSTableReader sstable;
 +    private final ClusteringIndexFilter filter;
 +    private final ColumnFilter selectedColumns;
 +    private final boolean isForThrift;
 +    private final int nowInSec;
 +    private final boolean applyThriftTransformation;
++    private final SSTableReadsListener listener;
 +    private ClusteringBound lowerBound;
 +    private boolean firstItemRetrieved;
 +
 +    public UnfilteredRowIteratorWithLowerBound(DecoratedKey partitionKey,
 +                                               SSTableReader sstable,
 +                                               ClusteringIndexFilter filter,
 +                                               ColumnFilter selectedColumns,
 +                                               boolean isForThrift,
 +                                               int nowInSec,
-                                                boolean applyThriftTransformation)
++                                               boolean applyThriftTransformation,
++                                               SSTableReadsListener listener)
 +    {
 +        super(partitionKey);
 +        this.sstable = sstable;
 +        this.filter = filter;
 +        this.selectedColumns = selectedColumns;
 +        this.isForThrift = isForThrift;
 +        this.nowInSec = nowInSec;
 +        this.applyThriftTransformation = applyThriftTransformation;
++        this.listener = listener;
 +        this.lowerBound = null;
 +        this.firstItemRetrieved = false;
 +    }
 +
 +    public Unfiltered lowerBound()
 +    {
 +        if (lowerBound != null)
 +            return makeBound(lowerBound);
 +
 +        // The partition index lower bound is more accurate than the sstable metadata lower bound but it is only
 +        // present if the iterator has already been initialized, which we only do when there are tombstones since in
 +        // this case we cannot use the sstable metadata clustering values
 +        ClusteringBound ret = getPartitionIndexLowerBound();
 +        return ret != null ? makeBound(ret) : makeBound(getMetadataLowerBound());
 +    }
 +
 +    private Unfiltered makeBound(ClusteringBound bound)
 +    {
 +        if (bound == null)
 +            return null;
 +
 +        if (lowerBound != bound)
 +            lowerBound = bound;
 +
 +        return new RangeTombstoneBoundMarker(lowerBound, DeletionTime.LIVE);
 +    }
 +
 +    @Override
 +    protected UnfilteredRowIterator initializeIterator()
 +    {
-         sstable.incrementReadCount();
- 
 +        @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
-         UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed(), isForThrift);
++        UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed(), isForThrift, listener);
 +        return isForThrift && applyThriftTransformation
 +               ? ThriftResultsMerger.maybeWrap(iter, nowInSec)
 +               : iter;
 +    }
 +
 +    @Override
 +    protected Unfiltered computeNext()
 +    {
 +        Unfiltered ret = super.computeNext();
 +        if (firstItemRetrieved)
 +            return ret;
 +
 +        // Check that the lower bound is not bigger than the first item retrieved
 +        firstItemRetrieved = true;
 +        if (lowerBound != null && ret != null)
 +            assert comparator().compare(lowerBound, ret.clustering()) <= 0
 +                : String.format("Lower bound [%s ]is bigger than first returned value [%s] for sstable %s",
 +                                lowerBound.toString(sstable.metadata),
 +                                ret.toString(sstable.metadata),
 +                                sstable.getFilename());
 +
 +        return ret;
 +    }
 +
 +    private Comparator<Clusterable> comparator()
 +    {
 +        return filter.isReversed() ? sstable.metadata.comparator.reversed() : sstable.metadata.comparator;
 +    }
 +
 +    @Override
 +    public CFMetaData metadata()
 +    {
 +        return sstable.metadata;
 +    }
 +
 +    @Override
 +    public boolean isReverseOrder()
 +    {
 +        return filter.isReversed();
 +    }
 +
 +    @Override
 +    public PartitionColumns columns()
 +    {
 +        return selectedColumns.fetchedColumns();
 +    }
 +
 +    @Override
 +    public EncodingStats stats()
 +    {
 +        return sstable.stats();
 +    }
 +
 +    @Override
 +    public DeletionTime partitionLevelDeletion()
 +    {
 +        if (!sstable.mayHaveTombstones())
 +            return DeletionTime.LIVE;
 +
 +        return super.partitionLevelDeletion();
 +    }
 +
 +    @Override
 +    public Row staticRow()
 +    {
 +        if (columns().statics.isEmpty())
 +            return Rows.EMPTY_STATIC_ROW;
 +
 +        return super.staticRow();
 +    }
 +
 +    /**
 +     * @return the lower bound stored on the index entry for this partition, if available.
 +     */
 +    private ClusteringBound getPartitionIndexLowerBound()
 +    {
 +        // NOTE: CASSANDRA-11206 removed the lookup against the key-cache as the IndexInfo objects are no longer
 +        // in memory for not heap backed IndexInfo objects (so, these are on disk).
 +        // CASSANDRA-11369 is there to fix this afterwards.
 +
 +        // Creating the iterator ensures that rowIndexEntry is loaded if available (partitions bigger than
 +        // DatabaseDescriptor.column_index_size_in_kb)
 +        if (!canUseMetadataLowerBound())
 +            maybeInit();
 +
 +        RowIndexEntry rowIndexEntry = sstable.getCachedPosition(partitionKey(), false);
 +        if (rowIndexEntry == null || !rowIndexEntry.indexOnHeap())
 +            return null;
 +
 +        try (RowIndexEntry.IndexInfoRetriever onHeapRetriever = rowIndexEntry.openWithIndex(null))
 +        {
 +            IndexInfo column = onHeapRetriever.columnsIndex(filter.isReversed() ? rowIndexEntry.columnsIndexCount() - 1 : 0);
 +            ClusteringPrefix lowerBoundPrefix = filter.isReversed() ? column.lastName : column.firstName;
 +            assert lowerBoundPrefix.getRawValues().length <= sstable.metadata.comparator.size() :
 +            String.format("Unexpected number of clustering values %d, expected %d or fewer for %s",
 +                          lowerBoundPrefix.getRawValues().length,
 +                          sstable.metadata.comparator.size(),
 +                          sstable.getFilename());
 +            return ClusteringBound.inclusiveOpen(filter.isReversed(), lowerBoundPrefix.getRawValues());
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException("should never occur", e);
 +        }
 +    }
 +
 +    /**
 +     * Whether we can use the clustering values in the stats of the sstable to build the lower bound.
 +     * <p>
 +     * Currently, the clustering values of the stats file records for each clustering component the min and max
 +     * value seen, null excluded. In other words, having a non-null value for a component in those min/max clustering
 +     * values does _not_ guarantee that there isn't an unfiltered in the sstable whose clustering has either no value for
 +     * that component (it's a prefix) or a null value.
 +     * <p>
 +     * This is problematic as this means we can't in general build a lower bound from those values since the "min"
 +     * values doesn't actually guarantee minimality.
 +     * <p>
 +     * However, we can use those values if we can guarantee that no clustering in the sstable 1) is a true prefix and
 +     * 2) uses null values. Nat having true prefixes means having no range tombstone markers since rows use
 +     * {@link Clustering} which is always "full" (all components are always present). As for null values, we happen to
 +     * only allow those in compact tables (for backward compatibility), so we can simply exclude those tables.
 +     * <p>
 +     * Note that the information we currently have at our disposal make this condition less precise that it could be.
 +     * In particular, {@link SSTableReader#mayHaveTombstones} could return {@code true} (making us not use the stats)
 +     * because of cell tombstone or even expiring cells even if the sstable has no range tombstone markers, even though
 +     * it's really only markers we want to exclude here (more precisely, as said above, we want to exclude anything
 +     * whose clustering is not "full", but that's only markers). It wouldn't be very hard to collect whether a sstable
 +     * has any range tombstone marker however so it's a possible improvement.
 +     */
 +    private boolean canUseMetadataLowerBound()
 +    {
 +        // Side-note: pre-2.1 sstable stat file had clustering value arrays whose size may not match the comparator size
 +        // and that would break getMetadataLowerBound. We don't support upgrade from 2.0 to 3.0 directly however so it's
 +        // not a true concern. Besides, !sstable.mayHaveTombstones already ensure this is a 3.0 sstable anyway.
 +        return !sstable.mayHaveTombstones() && !sstable.metadata.isCompactTable();
 +    }
 +
 +    /**
 +     * @return a global lower bound made from the clustering values stored in the sstable metadata, note that
 +     * this currently does not correctly compare tombstone bounds, especially ranges.
 +     */
 +    private ClusteringBound getMetadataLowerBound()
 +    {
 +        if (!canUseMetadataLowerBound())
 +            return null;
 +
 +        final StatsMetadata m = sstable.getSSTableMetadata();
 +        List<ByteBuffer> vals = filter.isReversed() ? m.maxClusteringValues : m.minClusteringValues;
 +        assert vals.size() <= sstable.metadata.comparator.size() :
 +        String.format("Unexpected number of clustering values %d, expected %d or fewer for %s",
 +                      vals.size(),
 +                      sstable.metadata.comparator.size(),
 +                      sstable.getFilename());
 +        return  ClusteringBound.inclusiveOpen(filter.isReversed(), vals.toArray(new ByteBuffer[vals.size()]));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index f6f30db,f38738d..e9b2491
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1524,31 -1501,48 +1524,56 @@@ public abstract class SSTableReader ext
      }
  
      /**
--     * Get position updating key cache and stats.
--     * @see #getPosition(PartitionPosition, SSTableReader.Operator, boolean)
++     * Retrieves the position while updating the key cache and the stats.
++     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
++     * allow key selection by token bounds but only if op != * EQ
++     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
+      */
+     public final RowIndexEntry getPosition(PartitionPosition key, Operator op)
+     {
+         return getPosition(key, op, SSTableReadsListener.NOOP_LISTENER);
+     }
+ 
++    /**
++     * Retrieves the position while updating the key cache and the stats.
++     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
++     * allow key selection by token bounds but only if op != * EQ
++     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
++     * @param listener the {@code SSTableReaderListener} that must handle the notifications.
 +     */
-     public RowIndexEntry getPosition(PartitionPosition key, Operator op)
+     public final RowIndexEntry getPosition(PartitionPosition key, Operator op, SSTableReadsListener listener)
      {
-         return getPosition(key, op, true, false);
+         return getPosition(key, op, true, false, listener);
      }
  
-     public RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats)
+     public final RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats)
      {
-         return getPosition(key, op, updateCacheAndStats, false);
+         return getPosition(key, op, updateCacheAndStats, false, SSTableReadsListener.NOOP_LISTENER);
      }
      /**
       * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
       * allow key selection by token bounds but only if op != * EQ
       * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
       * @param updateCacheAndStats true if updating stats and cache
++     * @param listener a listener used to handle internal events
       * @return The index entry corresponding to the key, or null if the key is not present
       */
-     protected abstract RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
+     protected abstract RowIndexEntry getPosition(PartitionPosition key,
+                                                  Operator op,
+                                                  boolean updateCacheAndStats,
+                                                  boolean permitMatchPastLast,
+                                                  SSTableReadsListener listener);
+ 
 -    public abstract SliceableUnfilteredRowIterator iterator(DecoratedKey key,
 -                                                            ColumnFilter selectedColumns,
 -                                                            boolean reversed,
 -                                                            boolean isForThrift,
 -                                                            SSTableReadsListener listener);
++    public abstract UnfilteredRowIterator iterator(DecoratedKey key,
++                                                   Slices slices,
++                                                   ColumnFilter selectedColumns,
++                                                   boolean reversed,
++                                                   boolean isForThrift,
++                                                   SSTableReadsListener listener);
 +
-     public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
 +    public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
  
 -    public abstract SliceableUnfilteredRowIterator iterator(FileDataInput file,
 -                                                            DecoratedKey key,
 -                                                            RowIndexEntry indexEntry,
 -                                                            ColumnFilter selectedColumns,
 -                                                            boolean reversed,
 -                                                            boolean isForThrift);
 +    public abstract UnfilteredRowIterator simpleIterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, boolean tombstoneOnly);
  
      /**
       * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
@@@ -2017,23 -1990,40 +2051,11 @@@
              readMeter.mark();
      }
  
-     private int compare(List<ByteBuffer> values1, List<ByteBuffer> values2)
 -    /**
 -     * Checks if this sstable can overlap with another one based on the min/man clustering values.
 -     * If this methods return false, we're guarantee that {@code this} and {@code other} have no overlapping
 -     * data, i.e. no cells to reconcile.
 -     */
 -    public boolean mayOverlapsWith(SSTableReader other)
--    {
-         ClusteringComparator comparator = metadata.comparator;
-         for (int i = 0; i < Math.min(values1.size(), values2.size()); i++)
-         {
-             int cmp = comparator.subtype(i).compare(values1.get(i), values2.get(i));
-             if (cmp != 0)
-                 return cmp;
-         }
-         return 0;
 -        StatsMetadata m1 = getSSTableMetadata();
 -        StatsMetadata m2 = other.getSSTableMetadata();
 -
 -        if (m1.minClusteringValues.isEmpty() || m1.maxClusteringValues.isEmpty() || m2.minClusteringValues.isEmpty() || m2.maxClusteringValues.isEmpty())
 -            return true;
 -
 -        return !(compare(m1.maxClusteringValues, m2.minClusteringValues) < 0 || compare(m1.minClusteringValues, m2.maxClusteringValues) > 0);
--    }
--
 -    private int compare(List<ByteBuffer> values1, List<ByteBuffer> values2)
 +    public EncodingStats stats()
      {
 -        ClusteringComparator comparator = metadata.comparator;
 -        for (int i = 0; i < Math.min(values1.size(), values2.size()); i++)
 -        {
 -            int cmp = comparator.subtype(i).compare(values1.get(i), values2.get(i));
 -            if (cmp != 0)
 -                return cmp;
 -        }
 -        return 0;
 -    }
 -
 -    public static class SizeComparator implements Comparator<SSTableReader>
 -    {
 -        public int compare(SSTableReader o1, SSTableReader o2)
 -        {
 -            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
 -        }
 +        // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
 +        // SerializationHeader.make() for details) so we use the latter instead.
 +        return new EncodingStats(getMinTimestamp(), getMinLocalDeletionTime(), getMinTTL());
      }
  
      public Ref<SSTableReader> tryRef()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java
index 0000000,6d384bf..8f6e3c0
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java
@@@ -1,0 -1,81 +1,82 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.io.sstable.format;
+ 
+ import org.apache.cassandra.db.RowIndexEntry;
++import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SelectionReason;
+ 
+ /**
+  * Listener for receiving notifications associated with reading SSTables.
+  */
+ public interface SSTableReadsListener
+ {
+     /**
+      * The reasons for skipping an SSTable
+      */
+     enum SkippingReason
+     {
+         BLOOM_FILTER,
+         MIN_MAX_KEYS,
+         PARTITION_INDEX_LOOKUP,
+         INDEX_ENTRY_NOT_FOUND;
+     }
+ 
+     /**
+      * The reasons for selecting an SSTable
+      */
+     enum SelectionReason
+     {
+         KEY_CACHE_HIT,
+         INDEX_ENTRY_FOUND;
+     }
+ 
+     /**
+      * Listener that does nothing.
+      */
+     static final SSTableReadsListener NOOP_LISTENER = new SSTableReadsListener() {};
+ 
+     /**
+      * Handles notification that the specified SSTable has been skipped during a single partition query.
+      *
+      * @param sstable the SSTable reader
+      * @param reason the reason for which the SSTable has been skipped
+      */
+     default void onSSTableSkipped(SSTableReader sstable, SkippingReason reason)
+     {
+     }
+ 
+     /**
+      * Handles notification that the specified SSTable has been selected during a single partition query.
+      *
+      * @param sstable the SSTable reader
+      * @param indexEntry the index entry
+      * @param reason the reason for which the SSTable has been selected
+      */
+     default void onSSTableSelected(SSTableReader sstable, RowIndexEntry<?> indexEntry, SelectionReason reason)
+     {
+     }
+ 
+     /**
+      * Handles notification that the specified SSTable is being scanned during a partition range query.
+      *
+      * @param sstable the SSTable reader of the SSTable being scanned.
+      */
+     default void onScanningStarted(SSTableReader sstable)
+     {
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 8c64b01,eeea18f..8551819
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@@ -30,11 -28,16 +30,13 @@@ import org.apache.cassandra.db.rows.Unf
  import org.apache.cassandra.dht.AbstractBounds;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.Component;
 -import org.apache.cassandra.io.sstable.CorruptSSTableException;
 -import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.*;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
+ import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SkippingReason;
+ import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SelectionReason;
  import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
  import org.apache.cassandra.io.util.FileDataInput;
- import org.apache.cassandra.io.util.RandomAccessReader;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.slf4j.Logger;
@@@ -57,29 -60,32 +59,29 @@@ public class BigTableReader extends SST
          super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
      }
  
-     public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
 -    public SliceableUnfilteredRowIterator iterator(DecoratedKey key,
 -                                                   ColumnFilter selectedColumns,
 -                                                   boolean reversed,
 -                                                   boolean isForThrift,
 -                                                   SSTableReadsListener listener)
++    public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift, SSTableReadsListener listener)
      {
-         RowIndexEntry rie = getPosition(key, SSTableReader.Operator.EQ);
 -        return reversed
 -             ? new SSTableReversedIterator(this, key, selectedColumns, isForThrift, listener)
 -             : new SSTableIterator(this, key, selectedColumns, isForThrift, listener);
++        RowIndexEntry rie = getPosition(key, SSTableReader.Operator.EQ, listener);
 +        return iterator(null, key, rie, slices, selectedColumns, reversed, isForThrift);
      }
  
 -    public SliceableUnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
 +    public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
      {
 +        if (indexEntry == null)
 +            return UnfilteredRowIterators.noRowsIterator(metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed);
          return reversed
 -             ? new SSTableReversedIterator(this, file, key, indexEntry, selectedColumns, isForThrift)
 -             : new SSTableIterator(this, file, key, indexEntry, selectedColumns, isForThrift);
 +             ? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile)
 +             : new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile);
      }
  
-     /**
-      * @param columns the columns to return.
-      * @param dataRange filter to use when reading the columns
-      * @return A Scanner for seeking over the rows of the SSTable.
-      */
-     public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift)
+     @Override
+     public ISSTableScanner getScanner(ColumnFilter columns,
+                                       DataRange dataRange,
+                                       RateLimiter limiter,
+                                       boolean isForThrift,
+                                       SSTableReadsListener listener)
      {
-         return BigTableScanner.getScanner(this, columns, dataRange, limiter, isForThrift);
+         return BigTableScanner.getScanner(this, columns, dataRange, limiter, isForThrift, listener);
      }
  
      /**
@@@ -118,21 -124,19 +120,19 @@@
      }
  
  
 -    /**
 -     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 -     * allow key selection by token bounds but only if op != * EQ
 -     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 -     * @param updateCacheAndStats true if updating stats and cache
 -     * @param listener a listener used to handle internal events
 -     * @return The index entry corresponding to the key, or null if the key is not present
 -     */
 +    @SuppressWarnings("resource") // caller to close
 +    @Override
 +    public UnfilteredRowIterator simpleIterator(FileDataInput dfile, DecoratedKey key, RowIndexEntry position, boolean tombstoneOnly)
 +    {
 +        return SSTableIdentityIterator.create(this, dfile, position, key, tombstoneOnly);
 +    }
 +
-     /**
-      * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
-      * allow key selection by token bounds but only if op != * EQ
-      * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
-      * @param updateCacheAndStats true if updating stats and cache
-      * @return The index entry corresponding to the key, or null if the key is not present
-      */
-     protected RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast)
++    @Override
+     protected RowIndexEntry getPosition(PartitionPosition key,
+                                         Operator op,
+                                         boolean updateCacheAndStats,
+                                         boolean permitMatchPastLast,
+                                         SSTableReadsListener listener)
      {
          if (op == Operator.EQ)
          {
@@@ -257,7 -265,8 +261,8 @@@
                      }
                      if (op == Operator.EQ && updateCacheAndStats)
                          bloomFilterTracker.addTruePositive();
+                     listener.onSSTableSelected(this, indexEntry, SelectionReason.INDEX_ENTRY_FOUND);
 -                    Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
 +                    Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndexCount(), descriptor.generation);
                      return indexEntry;
                  }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index e465a02,82d8211..f4bd1ea
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@@ -62,20 -63,30 +63,29 @@@ public class BigTableScanner implement
      private final DataRange dataRange;
      private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
      private final boolean isForThrift;
+     private final SSTableReadsListener listener;
 +    private long startScan = -1;
 +    private long bytesScanned = 0;
  
      protected Iterator<UnfilteredRowIterator> iterator;
  
      // Full scan of the sstables
      public static ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter)
      {
-         return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, Iterators.singletonIterator(fullRange(sstable)));
 -        return new BigTableScanner(sstable, limiter, Iterators.singletonIterator(fullRange(sstable)));
++        return new BigTableScanner(sstable,
++                                   ColumnFilter.all(sstable.metadata),
++                                   limiter,
++                                   Iterators.singletonIterator(fullRange(sstable)));
      }
  
-     public static ISSTableScanner getScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift)
+     public static ISSTableScanner getScanner(SSTableReader sstable,
+                                              ColumnFilter columns,
+                                              DataRange dataRange,
+                                              RateLimiter limiter,
+                                              boolean isForThrift,
+                                              SSTableReadsListener listener)
      {
-         return new BigTableScanner(sstable, columns, dataRange, limiter, isForThrift, makeBounds(sstable, dataRange).iterator());
 -        return new BigTableScanner(sstable,
 -                                   columns,
 -                                   dataRange,
 -                                   limiter,
 -                                   isForThrift,
 -                                   makeBounds(sstable, dataRange).iterator(),
 -                                   listener);
++        return new BigTableScanner(sstable, columns, dataRange, limiter, isForThrift, makeBounds(sstable, dataRange).iterator(), listener);
      }
  
      public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
@@@ -85,15 -96,28 +95,32 @@@
          if (positions.isEmpty())
              return new EmptySSTableScanner(sstable);
  
-         return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, makeBounds(sstable, tokenRanges).iterator());
 -        return new BigTableScanner(sstable, limiter, makeBounds(sstable, tokenRanges).iterator());
++        return new BigTableScanner(sstable,
++                                   ColumnFilter.all(sstable.metadata),
++                                   limiter,
++                                   makeBounds(sstable, tokenRanges).iterator());
      }
  
      public static ISSTableScanner getScanner(SSTableReader sstable, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
      {
-         return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, null, false, rangeIterator);
 -        return new BigTableScanner(sstable, null, rangeIterator);
++        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, rangeIterator);
      }
  
-     private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
+     private BigTableScanner(SSTableReader sstable,
++                            ColumnFilter columns,
+                             RateLimiter limiter,
+                             Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
+     {
 -        this(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, rangeIterator, SSTableReadsListener.NOOP_LISTENER);
++        this(sstable, columns, null, limiter, false, rangeIterator, SSTableReadsListener.NOOP_LISTENER);
+     }
+ 
+     private BigTableScanner(SSTableReader sstable,
+                             ColumnFilter columns,
+                             DataRange dataRange,
+                             RateLimiter limiter,
+                             boolean isForThrift,
+                             Iterator<AbstractBounds<PartitionPosition>> rangeIterator,
+                             SSTableReadsListener listener)
      {
          assert sstable != null;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
index fd9f221,2cf518a..d363ecf
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
@@@ -71,422 -69,68 +70,488 @@@ public class SSTablesIteratedTest exten
      }
  
      @Test
+     public void testSinglePartitionQuery() throws Throwable
+     {
+         createTable("CREATE TABLE %s (pk int, c int, v text, PRIMARY KEY (pk, c))");
+ 
+         execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", 1, 40, "41");
+         execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", 2, 10, "12");
+         flush();
+ 
+         execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", 1, 10, "11");
+         execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", 3, 30, "33");
+         flush();
+ 
+         execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", 1, 20, "21");
+         execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", 2, 40, "42");
+         execute("UPDATE %s SET v = '12' WHERE pk = 2 AND c = 10");
+         flush();
+ 
+         // Test with all the table being merged
+         executeAndCheck("SELECT * FROM %s WHERE pk = 1", 3,
+                         row(1, 10, "11"),
+                         row(1, 20, "21"),
+                         row(1, 40, "41"));
+ 
+         // Test with only 2 of the 3 SSTables being merged
+         executeAndCheck("SELECT * FROM %s WHERE pk = 2", 2,
+                         row(2, 10, "12"),
+                         row(2, 40, "42"));
+ 
+         executeAndCheck("SELECT * FROM %s WHERE pk = 2 ORDER BY c DESC", 2,
+                         row(2, 40, "42"),
+                         row(2, 10, "12"));
+ 
+         // Test with only 2 of the 3 SSTables being merged and a Slice filter
+         executeAndCheck("SELECT * FROM %s WHERE pk = 2 AND c > 20", 2,
+                         row(2, 40, "42"));
+ 
+         executeAndCheck("SELECT * FROM %s WHERE pk = 2 AND c > 20 ORDER BY c DESC", 2,
+                         row(2, 40, "42"));
+ 
+         // Test with only 2 of the 3 SSTables being merged and a Name filter
+         // This test checks the SinglePartitionReadCommand::queryMemtableAndSSTablesInTimestampOrder which is only
+         // used for ClusteringIndexNamesFilter when there are no multi-cell columns
+         executeAndCheck("SELECT * FROM %s WHERE pk = 2 AND c = 10", 2,
+                         row(2, 10, "12"));
+ 
+         // For partition range queries the metric must not be updated. The reason being that range queries simply
+         // scan all the SSTables containing data within the partition range. Due to that they might pollute the metric
+         // and give a wrong view of the system.
+         executeAndCheck("SELECT * FROM %s", 0,
+                         row(1, 10, "11"),
+                         row(1, 20, "21"),
+                         row(1, 40, "41"),
+                         row(2, 10, "12"),
+                         row(2, 40, "42"),
+                         row(3, 30, "33"));
+ 
+         executeAndCheck("SELECT * FROM %s WHERE token(pk) = token(1)", 0,
+                         row(1, 10, "11"),
+                         row(1, 20, "21"),
+                         row(1, 40, "41"));
+ 
+         assertInvalidMessage("ORDER BY is only supported when the partition key is restricted by an EQ or an IN",
+                              "SELECT * FROM %s WHERE token(pk) = token(1) ORDER BY C DESC");
+     }
 -}
++
++    @Test
 +    public void testSSTablesOnlyASC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
 +        flush();
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 10, "10"), row(1, 20, "20"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 10, "10"));
 +    }
 +
 +    @Test
 +    public void testMixedMemtableSStablesASC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 0, row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 10, "10"), row(1, 20, "20"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 0, row(1, 10, "10"));
 +    }
 +
 +    @Test
 +    public void testOverlappingSStablesASC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
 +        flush();
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 10, "10"), row(1, 20, "20"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 10, "10"));
 +    }
 +
 +    @Test
 +    public void testSSTablesOnlyDESC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
 +        flush();
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 30, "30"), row(1, 20, "20"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 30, "30"));
 +    }
 +
 +    @Test
 +    public void testMixedMemtableSStablesDESC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 0, row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 30, "30"), row(1, 20, "20"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 0, row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 0, row(1, 30, "30"));
 +    }
 +
 +    @Test
 +    public void testOverlappingSStablesDESC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
 +        flush();
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 30, "30"), row(1, 20, "20"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 30, "30"));
 +    }
 +
 +    @Test
 +    public void testDeletionOnDifferentSSTables() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
 +        flush();
 +
 +        execute("DELETE FROM %s WHERE id=1 and col=30");
 +        flush();
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 3, row(1, 20, "20"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 4, row(1, 20, "20"), row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 4, row(1, 20, "20"), row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 4, row(1, 20, "20"), row(1, 10, "10"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 2);
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 3, row(1, 20, "20"));
 +    }
 +
 +    @Test
 +    public void testDeletionOnSameSSTable() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
 +        execute("DELETE FROM %s WHERE id=1 and col=30");
 +        flush();
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 2, row(1, 20, "20"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 3, row(1, 20, "20"), row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 20, "20"), row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 20, "20"), row(1, 10, "10"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1);
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 2, row(1, 20, "20"));
 +    }
 +
 +    @Test
 +    public void testDeletionOnMemTable() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
 +        execute("DELETE FROM %s WHERE id=1 and col=30");
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 20, "20"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 20, "20"), row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 20, "20"), row(1, 10, "10"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 20, "20"), row(1, 10, "10"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 0);
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 20, "20"));
 +    }
 +
 +    @Test
 +    public void testDeletionOnIndexedSSTableDESC() throws Throwable
 +    {
 +        testDeletionOnIndexedSSTableDESC(true);
 +        testDeletionOnIndexedSSTableDESC(false);
 +    }
 +
 +    private void testDeletionOnIndexedSSTableDESC(boolean deleteWithRange) throws Throwable
 +    {
 +        // reduce the column index size so that columns get indexed during flush
 +        DatabaseDescriptor.setColumnIndexSize(1);
 +
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
 +
 +        for (int i = 1; i <= 1000; i++)
 +        {
 +            execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
 +        }
 +        flush();
 +
 +        Object[][] allRows = new Object[1000][];
 +        for (int i = 1001; i <= 2000; i++)
 +        {
 +            execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
 +            allRows[2000 - i] = row(1, i, Integer.toString(i));
 +        }
 +
 +        if (deleteWithRange)
 +        {
 +            execute("DELETE FROM %s WHERE id=1 and col <= ?", 1000);
 +        }
 +        else
 +        {
 +            for (int i = 1; i <= 1000; i++)
 +                execute("DELETE FROM %s WHERE id=1 and col = ?", i);
 +        }
 +        flush();
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 2000, "2000"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 2000, "2000"), row(1, 1999, "1999"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, allRows);
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 1, row(1, 2000, "2000"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 1, row(1, 2000, "2000"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000", 1, allRows);
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000", 2, allRows);
 +    }
 +
 +    @Test
 +    public void testDeletionOnIndexedSSTableASC() throws Throwable
 +    {
 +        testDeletionOnIndexedSSTableASC(true);
 +        testDeletionOnIndexedSSTableASC(false);
 +    }
 +
 +    private void testDeletionOnIndexedSSTableASC(boolean deleteWithRange) throws Throwable
 +    {
 +        // reduce the column index size so that columns get indexed during flush
 +        DatabaseDescriptor.setColumnIndexSize(1);
 +
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
 +
 +        for (int i = 1; i <= 1000; i++)
 +        {
 +            execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
 +        }
 +        flush();
 +
 +        Object[][] allRows = new Object[1000][];
 +        for (int i = 1001; i <= 2000; i++)
 +        {
 +            execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
 +            allRows[i - 1001] = row(1, i, Integer.toString(i));
 +        }
 +        flush();
 +
 +        if (deleteWithRange)
 +        {
 +            execute("DELETE FROM %s WHERE id =1 and col <= ?", 1000);
 +        }
 +        else
 +        {
 +            for (int i = 1; i <= 1000; i++)
 +                execute("DELETE FROM %s WHERE id=1 and col = ?", i);
 +        }
 +        flush();
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 3, row(1, 1001, "1001"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 3, row(1, 1001, "1001"), row(1, 1002, "1002"));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 3, allRows);
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 2, row(1, 1001, "1001"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 3, row(1, 1001, "1001"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000", 2, allRows);
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000", 3, allRows);
 +    }
 +
 +    @Test
 +    public void testDeletionOnOverlappingIndexedSSTable() throws Throwable
 +    {
 +        testDeletionOnOverlappingIndexedSSTable(true);
 +        testDeletionOnOverlappingIndexedSSTable(false);
 +    }
 +
 +    private void testDeletionOnOverlappingIndexedSSTable(boolean deleteWithRange) throws Throwable
 +    {
 +        // reduce the column index size so that columns get indexed during flush
 +        DatabaseDescriptor.setColumnIndexSize(1);
 +
 +        createTable("CREATE TABLE %s (id int, col int, val1 text, val2 text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)");
 +
 +        for (int i = 1; i <= 500; i++)
 +        {
 +            if (i % 2 == 0)
 +                execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
 +            else
 +                execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
 +        }
 +
 +        for (int i = 1001; i <= 1500; i++)
 +        {
 +            if (i % 2 == 0)
 +                execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
 +            else
 +                execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
 +        }
 +
 +        flush();
 +
 +        for (int i = 501; i <= 1000; i++)
 +        {
 +            if (i % 2 == 0)
 +                execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
 +            else
 +                execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
 +        }
 +
 +        for (int i = 1501; i <= 2000; i++)
 +        {
 +            if (i % 2 == 0)
 +                execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i));
 +            else
 +                execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i));
 +        }
 +
 +        if (deleteWithRange)
 +        {
 +            execute("DELETE FROM %s WHERE id=1 and col > ? and col <= ?", 250, 750);
 +        }
 +        else
 +        {
 +            for (int i = 251; i <= 750; i++)
 +                execute("DELETE FROM %s WHERE id=1 and col = ?", i);
 +        }
 +
 +        flush();
 +
 +        Object[][] allRows = new Object[1500][]; // non deleted rows
 +        for (int i = 1; i <= 2000; i++)
 +        {
 +            if (i > 250 && i <= 750)
 +                continue; // skip deleted records
 +
 +            int idx = (i <= 250 ? i - 1 : i - 501);
 +
 +            if (i % 2 == 0)
 +                allRows[idx] = row(1, i, Integer.toString(i), null);
 +            else
 +                allRows[idx] = row(1, i, Integer.toString(i), Integer.toString(i));
 +        }
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 2, row(1, 1, "1", "1"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 1, "1", "1"), row(1, 2, "2", null));
 +
 +        executeAndCheck("SELECT * FROM %s WHERE id=1", 2, allRows);
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 2, row(1, 1001, "1001", "1001"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 2, row(1, 1, "1", "1"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 500 LIMIT 1", 2, row(1, 751, "751", "751"));
 +        executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 500 LIMIT 1", 2, row(1, 1, "1", "1"));
 +    }
 +
 +    @Test
 +    public void testMultiplePartitionsDESC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)");
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10");
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 10, "10");
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 10, "10");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20");
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 20, "20");
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 20, "20");
 +        flush();
 +
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30");
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 30, "30");
 +        execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 30, "30");
 +        flush();
 +
 +        for (int i = 1; i <= 3; i++)
 +        {
 +            String base = "SELECT * FROM %s ";
 +
 +            executeAndCheck(base + String.format("WHERE id=%d LIMIT 1", i), 1, row(i, 30, "30"));
 +            executeAndCheck(base + String.format("WHERE id=%d LIMIT 2", i), 2, row(i, 30, "30"), row(i, 20, "20"));
 +            executeAndCheck(base + String.format("WHERE id=%d LIMIT 3", i), 3, row(i, 30, "30"), row(i, 20, "20"), row(i, 10, "10"));
 +            executeAndCheck(base + String.format("WHERE id=%d", i), 3, row(i, 30, "30"), row(i, 20, "20"), row(i, 10, "10"));
 +
 +            executeAndCheck(base + String.format("WHERE id=%d AND col > 25 LIMIT 1", i), 1, row(i, 30, "30"));
 +            executeAndCheck(base + String.format("WHERE id=%d AND col < 40 LIMIT 1", i), 1, row(i, 30, "30"));
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
index 88ed52e,451af25..f7ced23
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
@@@ -210,7 -208,11 +211,12 @@@ public class SSTableCorruptionDetection
              for (int i = 0; i < numberOfPks; i++)
              {
                  DecoratedKey dk = Util.dk(String.format("pkvalue_%07d", i));
-                 try (UnfilteredRowIterator rowIter = sstable.iterator(dk, Slices.ALL, ColumnFilter.all(cfs.metadata), false, false))
+                 try (UnfilteredRowIterator rowIter = sstable.iterator(dk,
++                                                                      Slices.ALL,
+                                                                       ColumnFilter.all(cfs.metadata),
+                                                                       false,
+                                                                       false,
+                                                                       SSTableReadsListener.NOOP_LISTENER))
                  {
                      while (rowIter.hasNext())
                      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
index d73c278,cf57b17..d1db09a
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@@ -182,7 -183,10 +183,7 @@@ public class SSTableScannerTes
          assert boundaries.length % 2 == 0;
          for (DataRange range : dataRanges(sstable.metadata, scanStart, scanEnd))
          {
-             try(ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(sstable.metadata), range, false))
 -            try(ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(sstable.metadata),
 -                                                             range,
 -                                                             false,
 -                                                             SSTableReadsListener.NOOP_LISTENER))
++            try(ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(sstable.metadata), range, false, SSTableReadsListener.NOOP_LISTENER))
              {
                  for (int b = 0; b < boundaries.length; b += 2)
                      for (int i = boundaries[b]; i <= boundaries[b + 1]; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
index 1c7d61f,e714c60..391927c
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
@@@ -223,7 -224,11 +224,12 @@@ public class SSTableWriterTest extends 
              try
              {
                  DecoratedKey dk = Util.dk("large_value");
-                 UnfilteredRowIterator rowIter = sstable.iterator(dk, Slices.ALL, ColumnFilter.all(cfs.metadata), false, false);
+                 UnfilteredRowIterator rowIter = sstable.iterator(dk,
++                                                                 Slices.ALL,
+                                                                  ColumnFilter.all(cfs.metadata),
+                                                                  false,
+                                                                  false,
+                                                                  SSTableReadsListener.NOOP_LISTENER);
                  while (rowIter.hasNext())
                  {
                      rowIter.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c9db9af/test/unit/org/apache/cassandra/io/sstable/format/ClientModeSSTableTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/format/ClientModeSSTableTest.java
index 7a741f9,48a8af5..d86a44f
--- a/test/unit/org/apache/cassandra/io/sstable/format/ClientModeSSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/format/ClientModeSSTableTest.java
@@@ -105,7 -107,11 +105,12 @@@ public class ClientModeSSTableTes
  
              ByteBuffer key = bytes(Integer.toString(100));
  
-             try (UnfilteredRowIterator iter = reader.iterator(metadata.decorateKey(key), Slices.ALL, ColumnFilter.selection(metadata.partitionColumns()), false, false))
 -            try (SliceableUnfilteredRowIterator iter = reader.iterator(metadata.decorateKey(key),
 -                                                                       ColumnFilter.selection(metadata.partitionColumns()),
 -                                                                       false,
 -                                                                       false,
 -                                                                       SSTableReadsListener.NOOP_LISTENER))
++            try (UnfilteredRowIterator iter = reader.iterator(metadata.decorateKey(key),
++                                                              Slices.ALL,
++                                                              ColumnFilter.selection(metadata.partitionColumns()),
++                                                              false,
++                                                              false,
++                                                              SSTableReadsListener.NOOP_LISTENER))
              {
                  assert iter.next().clustering().get(0).equals(key);
              }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org