You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/22 18:05:41 UTC

[12/15] cassandra git commit: Simplify some 8099's implementations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java
index 05c2977..2ffb91e 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
@@ -25,6 +24,7 @@ import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -68,8 +68,8 @@ public class Slice
     private Slice(Bound start, Bound end)
     {
         assert start.isStart() && end.isEnd();
-        this.start = start.takeAlias();
-        this.end = end.takeAlias();
+        this.start = start;
+        this.end = end;
     }
 
     public static Slice make(Bound start, Bound end)
@@ -331,7 +331,7 @@ public class Slice
                  + Bound.serializer.serializedSize(slice.end, version, types);
         }
 
-        public Slice deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
+        public Slice deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException
         {
             Bound start = Bound.serializer.deserialize(in, version, types);
             Bound end = Bound.serializer.deserialize(in, version, types);
@@ -346,21 +346,19 @@ public class Slice
      */
     public static class Bound extends AbstractClusteringPrefix
     {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new Bound(Kind.INCL_START_BOUND, new ByteBuffer[0]));
         public static final Serializer serializer = new Serializer();
 
-        /** The smallest start bound, i.e. the one that starts before any row. */
-        public static final Bound BOTTOM = inclusiveStartOf();
-        /** The biggest end bound, i.e. the one that ends after any row. */
-        public static final Bound TOP = inclusiveEndOf();
-
-        protected final Kind kind;
-        protected final ByteBuffer[] values;
+        /**
+         * The smallest and biggest bound. Note that as range tomstone bounds are (special case) of slice bounds,
+         * we want the BOTTOM and TOP to be the same object, but we alias them here because it's cleaner when dealing
+         * with slices to refer to Slice.Bound.BOTTOM and Slice.Bound.TOP.
+         */
+        public static final Bound BOTTOM = RangeTombstone.Bound.BOTTOM;
+        public static final Bound TOP = RangeTombstone.Bound.TOP;
 
         protected Bound(Kind kind, ByteBuffer[] values)
         {
-            this.kind = kind;
-            this.values = values;
+            super(kind, values);
         }
 
         public static Bound create(Kind kind, ByteBuffer[] values)
@@ -396,22 +394,6 @@ public class Slice
             return create(Kind.EXCL_END_BOUND, values);
         }
 
-        public static Bound exclusiveStartOf(ClusteringPrefix prefix)
-        {
-            ByteBuffer[] values = new ByteBuffer[prefix.size()];
-            for (int i = 0; i < prefix.size(); i++)
-                values[i] = prefix.get(i);
-            return exclusiveStartOf(values);
-        }
-
-        public static Bound inclusiveEndOf(ClusteringPrefix prefix)
-        {
-            ByteBuffer[] values = new ByteBuffer[prefix.size()];
-            for (int i = 0; i < prefix.size(); i++)
-                values[i] = prefix.get(i);
-            return inclusiveEndOf(values);
-        }
-
         public static Bound create(ClusteringComparator comparator, boolean isStart, boolean isInclusive, Object... values)
         {
             CBuilder builder = CBuilder.create(comparator);
@@ -426,21 +408,6 @@ public class Slice
             return builder.buildBound(isStart, isInclusive);
         }
 
-        public Kind kind()
-        {
-            return kind;
-        }
-
-        public int size()
-        {
-            return values.length;
-        }
-
-        public ByteBuffer get(int i)
-        {
-            return values[i];
-        }
-
         public Bound withNewKind(Kind kind)
         {
             assert !kind.isBoundary();
@@ -480,24 +447,6 @@ public class Slice
             return withNewKind(kind().invert());
         }
 
-        public ByteBuffer[] getRawValues()
-        {
-            return values;
-        }
-
-        public void digest(MessageDigest digest)
-        {
-            for (int i = 0; i < size(); i++)
-                digest.update(get(i).duplicate());
-            FBUtilities.updateWithByte(digest, kind().ordinal());
-        }
-
-        public void writeTo(Slice.Bound.Writer writer)
-        {
-            super.writeTo(writer);
-            writer.writeBoundKind(kind());
-        }
-
         // For use by intersects, it's called with the sstable bound opposite to the slice bound
         // (so if the slice bound is a start, it's call with the max sstable bound)
         private int compareTo(ClusteringComparator comparator, List<ByteBuffer> sstableBound)
@@ -544,66 +493,10 @@ public class Slice
             return sb.append(")").toString();
         }
 
-        // Overriding to get a more precise type
-        @Override
-        public Bound takeAlias()
-        {
-            return this;
-        }
-
-        @Override
-        public long unsharedHeapSize()
-        {
-            return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
-        }
-
-        public long unsharedHeapSizeExcludingData()
-        {
-            return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
-        }
-
-        public static Builder builder(int size)
-        {
-            return new Builder(size);
-        }
-
-        public interface Writer extends ClusteringPrefix.Writer
-        {
-            public void writeBoundKind(Kind kind);
-        }
-
-        public static class Builder implements Writer
-        {
-            private final ByteBuffer[] values;
-            private Kind kind;
-            private int idx;
-
-            private Builder(int size)
-            {
-                this.values = new ByteBuffer[size];
-            }
-
-            public void writeClusteringValue(ByteBuffer value)
-            {
-                values[idx++] = value;
-            }
-
-            public void writeBoundKind(Kind kind)
-            {
-                this.kind = kind;
-            }
-
-            public Slice.Bound build()
-            {
-                assert idx == values.length;
-                return Slice.Bound.create(kind, values);
-            }
-        }
-
         /**
          * Serializer for slice bounds.
          * <p>
-         * Contrarily to {@code Clustering}, a slice bound can only be a true prefix of the full clustering, so we actually record
+         * Contrarily to {@code Clustering}, a slice bound can be a true prefix of the full clustering, so we actually record
          * its size.
          */
         public static class Serializer
@@ -622,31 +515,21 @@ public class Slice
                      + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types);
             }
 
-            public Slice.Bound deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
+            public Slice.Bound deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException
             {
                 Kind kind = Kind.values()[in.readByte()];
                 return deserializeValues(in, kind, version, types);
             }
 
-            public Slice.Bound deserializeValues(DataInput in, Kind kind, int version, List<AbstractType<?>> types) throws IOException
+            public Slice.Bound deserializeValues(DataInputPlus in, Kind kind, int version, List<AbstractType<?>> types) throws IOException
             {
                 int size = in.readUnsignedShort();
                 if (size == 0)
                     return kind.isStart() ? BOTTOM : TOP;
 
-                Builder builder = builder(size);
-                ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, builder);
-                builder.writeBoundKind(kind);
-                return builder.build();
-            }
-
-            public void deserializeValues(DataInput in, Bound.Kind kind, int version, List<AbstractType<?>> types, Writer writer) throws IOException
-            {
-                int size = in.readUnsignedShort();
-                ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, writer);
-                writer.writeBoundKind(kind);
+                ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types);
+                return Slice.Bound.create(kind, values);
             }
-
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/Slices.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java
index a6c690b..32ca06d 100644
--- a/src/java/org/apache/cassandra/db/Slices.java
+++ b/src/java/org/apache/cassandra/db/Slices.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -28,6 +27,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
@@ -318,7 +318,7 @@ public abstract class Slices implements Iterable<Slice>
             return size;
         }
 
-        public Slices deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+        public Slices deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
         {
             int size = in.readInt();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index e8247a3..df7e7ef 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1160,7 +1160,7 @@ public final class SystemKeyspace
 
         // delete all previous values with a single range tombstone.
         int nowInSec = FBUtilities.nowInSeconds();
-        update.addRangeTombstone(Slice.make(SizeEstimates.comparator, table), new SimpleDeletionTime(timestamp - 1, nowInSec));
+        update.add(new RangeTombstone(Slice.make(SizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec)));
 
         // add a CQL row for each primary token range.
         for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/TypeSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java
index a9e432f..73766c8 100644
--- a/src/java/org/apache/cassandra/db/TypeSizes.java
+++ b/src/java/org/apache/cassandra/db/TypeSizes.java
@@ -68,6 +68,11 @@ public final class TypeSizes
         return sizeof(value.remaining()) + value.remaining();
     }
 
+    public static int sizeofWithVIntLength(ByteBuffer value)
+    {
+        return sizeofVInt(value.remaining()) + value.remaining();
+    }
+
     public static int sizeof(boolean value)
     {
         return BOOL_SIZE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index cf7c2dd..b3709d2 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -111,8 +111,7 @@ public abstract class UnfilteredDeserializer
         private boolean isReady;
         private boolean isDone;
 
-        private final ReusableRow row;
-        private final RangeTombstoneMarker.Builder markerBuilder;
+        private final Row.Builder builder;
 
         private CurrentDeserializer(CFMetaData metadata,
                                     DataInputPlus in,
@@ -122,8 +121,7 @@ public abstract class UnfilteredDeserializer
             super(metadata, in, helper);
             this.header = header;
             this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header);
-            this.row = new ReusableRow(metadata.clusteringColumns().size(), header.columns().regulars, true, metadata.isCounter());
-            this.markerBuilder = new RangeTombstoneMarker.Builder(metadata.clusteringColumns().size());
+            this.builder = ArrayBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
         }
 
         public boolean hasNext() throws IOException
@@ -181,17 +179,13 @@ public abstract class UnfilteredDeserializer
             isReady = false;
             if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
             {
-                markerBuilder.reset();
-                RangeTombstone.Bound.Kind kind = clusteringDeserializer.deserializeNextBound(markerBuilder);
-                UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, kind.isBoundary(), markerBuilder);
-                return markerBuilder.build();
+                RangeTombstone.Bound bound = clusteringDeserializer.deserializeNextBound();
+                return UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, bound);
             }
             else
             {
-                Row.Writer writer = row.writer();
-                clusteringDeserializer.deserializeNextClustering(writer);
-                UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, writer);
-                return row;
+                builder.newRow(clusteringDeserializer.deserializeNextClustering());
+                return UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, builder);
             }
         }
 
@@ -205,7 +199,7 @@ public abstract class UnfilteredDeserializer
             }
             else
             {
-                UnfilteredSerializer.serializer.skipRowBody(in, header, helper, nextFlags);
+                UnfilteredSerializer.serializer.skipRowBody(in, header, nextFlags);
             }
         }
 
@@ -221,7 +215,6 @@ public abstract class UnfilteredDeserializer
         private final boolean readAllAsDynamic;
         private boolean skipStatic;
 
-        private int nextFlags;
         private boolean isDone;
         private boolean isStart = true;
 
@@ -254,13 +247,7 @@ public abstract class UnfilteredDeserializer
 
         public boolean hasNext() throws IOException
         {
-            if (nextAtom != null)
-                return true;
-
-            if (isDone)
-                return false;
-
-            return deserializeNextAtom();
+            return nextAtom != null || (!isDone && deserializeNextAtom());
         }
 
         private boolean deserializeNextAtom() throws IOException
@@ -392,6 +379,7 @@ public abstract class UnfilteredDeserializer
             grouper.addAtom(nextAtom);
             while (deserializeNextAtom() && grouper.addAtom(nextAtom))
             {
+                // Nothing to do, deserializeNextAtom() changes nextAtom and it's then added to the grouper
             }
 
             // if this was the first static row, we're done with it. Otherwise, we're also done with static.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index b406251..8625112 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -38,8 +38,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
 {
-    private static final Logger logger = LoggerFactory.getLogger(AbstractSSTableIterator.class);
-
     protected final SSTableReader sstable;
     protected final DecoratedKey key;
     protected final DeletionTime partitionLevelDeletion;
@@ -65,7 +63,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
         this.sstable = sstable;
         this.key = key;
         this.columns = columnFilter;
-        this.helper = new SerializationHelper(sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter);
+        this.helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter);
         this.isForThrift = isForThrift;
 
         if (indexEntry == null)
@@ -81,7 +79,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
             {
                 // We seek to the beginning to the partition if either:
                 //   - the partition is not indexed; we then have a single block to read anyway
-                //     and we need to read the partition deletion time.
+                //     (and we need to read the partition deletion time).
                 //   - we're querying static columns.
                 boolean needSeekAtPartitionStart = !indexEntry.isIndexed() || !columns.fetchedColumns().statics.isEmpty();
 
@@ -104,24 +102,24 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
 
                     // Note that this needs to be called after file != null and after the partitionDeletion has been set, but before readStaticRow
                     // (since it uses it) so we can't move that up (but we'll be able to simplify as soon as we drop support for the old file format).
-                    this.reader = needsReader ? createReader(indexEntry, file, needSeekAtPartitionStart, shouldCloseFile) : null;
+                    this.reader = needsReader ? createReader(indexEntry, file, true, shouldCloseFile) : null;
                     this.staticRow = readStaticRow(sstable, file, helper, columns.fetchedColumns().statics, isForThrift, reader == null ? null : reader.deserializer);
                 }
                 else
                 {
                     this.partitionLevelDeletion = indexEntry.deletionTime();
                     this.staticRow = Rows.EMPTY_STATIC_ROW;
-                    this.reader = needsReader ? createReader(indexEntry, file, needSeekAtPartitionStart, shouldCloseFile) : null;
+                    this.reader = needsReader ? createReader(indexEntry, file, false, shouldCloseFile) : null;
                 }
 
-                if (reader == null && shouldCloseFile)
+                if (reader == null && file != null && shouldCloseFile)
                     file.close();
             }
             catch (IOException e)
             {
                 sstable.markSuspect();
                 String filePath = file.getPath();
-                if (shouldCloseFile && file != null)
+                if (shouldCloseFile)
                 {
                     try
                     {
@@ -164,7 +162,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
             if (statics.isEmpty() || isForThrift)
                 return Rows.EMPTY_STATIC_ROW;
 
-            assert sstable.metadata.isStaticCompactTable() && !isForThrift;
+            assert sstable.metadata.isStaticCompactTable();
 
             // As said above, if it's a CQL query and the table is a "static compact", the only exposed columns are the
             // static ones. So we don't have to mark the position to seek back later.
@@ -221,45 +219,13 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
 
     public boolean hasNext()
     {
-        try
-        {
-            return reader != null && reader.hasNext();
-        }
-        catch (IOException e)
-        {
-            try
-            {
-                closeInternal();
-            }
-            catch (IOException suppressed)
-            {
-                e.addSuppressed(suppressed);
-            }
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, reader.file.getPath());
-        }
+        return reader != null && reader.hasNext();
     }
 
     public Unfiltered next()
     {
-        try
-        {
-            assert reader != null;
-            return reader.next();
-        }
-        catch (IOException e)
-        {
-            try
-            {
-                closeInternal();
-            }
-            catch (IOException suppressed)
-            {
-                e.addSuppressed(suppressed);
-            }
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, reader.file.getPath());
-        }
+        assert reader != null;
+        return reader.next();
     }
 
     public Iterator<Unfiltered> slice(Slice slice)
@@ -269,7 +235,8 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
             if (reader == null)
                 return Collections.emptyIterator();
 
-            return reader.slice(slice);
+            reader.setForSlice(slice);
+            return reader;
         }
         catch (IOException e)
         {
@@ -317,7 +284,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
         }
     }
 
-    protected abstract class Reader
+    protected abstract class Reader implements Iterator<Unfiltered>
     {
         private final boolean shouldCloseFile;
         public FileDataInput file;
@@ -327,12 +294,19 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
         // Records the currently open range tombstone (if any)
         protected DeletionTime openMarker = null;
 
-        protected Reader(FileDataInput file, boolean shouldCloseFile)
+        // !isInit means we have never seeked in the file and thus should seek before reading anything
+        protected boolean isInit;
+
+        protected Reader(FileDataInput file, boolean isInit, boolean shouldCloseFile)
         {
             this.file = file;
+            this.isInit = isInit;
             this.shouldCloseFile = shouldCloseFile;
+
             if (file != null)
                 createDeserializer();
+            else
+                assert !isInit;
         }
 
         private void createDeserializer()
@@ -369,9 +343,62 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
             return toReturn;
         }
 
-        public abstract boolean hasNext() throws IOException;
-        public abstract Unfiltered next() throws IOException;
-        public abstract Iterator<Unfiltered> slice(Slice slice) throws IOException;
+        public boolean hasNext() 
+        {
+            try
+            {
+                if (!isInit)
+                {
+                    init();
+                    isInit = true;
+                }
+
+                return hasNextInternal();
+            }
+            catch (IOException e)
+            {
+                try
+                {
+                    closeInternal();
+                }
+                catch (IOException suppressed)
+                {
+                    e.addSuppressed(suppressed);
+                }
+                sstable.markSuspect();
+                throw new CorruptSSTableException(e, reader.file.getPath());
+            }
+        }
+
+        public Unfiltered next()
+        {
+            try
+            {
+                return nextInternal();
+            }
+            catch (IOException e)
+            {
+                try
+                {
+                    closeInternal();
+                }
+                catch (IOException suppressed)
+                {
+                    e.addSuppressed(suppressed);
+                }
+                sstable.markSuspect();
+                throw new CorruptSSTableException(e, reader.file.getPath());
+            }
+        }
+
+        // Called is hasNext() is called but we haven't been yet initialized
+        protected abstract void init() throws IOException;
+
+        // Set the reader so its hasNext/next methods return values within the provided slice
+        public abstract void setForSlice(Slice slice) throws IOException;
+
+        protected abstract boolean hasNextInternal() throws IOException;
+        protected abstract Unfiltered nextInternal() throws IOException;
 
         public void close() throws IOException
         {
@@ -380,35 +407,61 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
         }
     }
 
-    protected abstract class IndexedReader extends Reader
+    // Used by indexed readers to store where they are of the index.
+    protected static class IndexState
     {
-        protected final RowIndexEntry indexEntry;
-        protected final List<IndexHelper.IndexInfo> indexes;
+        private final Reader reader;
+        private final ClusteringComparator comparator;
 
-        protected int currentIndexIdx = -1;
+        private final RowIndexEntry indexEntry;
+        private final List<IndexHelper.IndexInfo> indexes;
+        private final boolean reversed;
 
-        // Marks the beginning of the block corresponding to currentIndexIdx.
-        protected FileMark mark;
+        private int currentIndexIdx = -1;
 
-        // !isInit means we have never seeked in the file and thus shouldn't read as we could be anywhere
-        protected boolean isInit;
+        // Marks the beginning of the block corresponding to currentIndexIdx.
+        private FileMark mark;
 
-        protected IndexedReader(FileDataInput file, boolean shouldCloseFile, RowIndexEntry indexEntry, boolean isInit)
+        public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed)
         {
-            super(file, shouldCloseFile);
+            this.reader = reader;
+            this.comparator = comparator;
             this.indexEntry = indexEntry;
             this.indexes = indexEntry.columnsIndex();
-            this.isInit = isInit;
+            this.reversed = reversed;
+            this.currentIndexIdx = reversed ? indexEntry.columnsIndex().size() : -1;
+        }
+
+        public boolean isDone()
+        {
+            return reversed ? currentIndexIdx < 0 : currentIndexIdx >= indexes.size();
         }
 
-        // Should be called when we're at the beginning of blockIdx.
-        protected void updateBlock(int blockIdx) throws IOException
+        // Sets the reader to the beginning of blockIdx.
+        public void setToBlock(int blockIdx) throws IOException
         {
-            seekToPosition(indexEntry.position + indexes.get(blockIdx).offset);
+            if (blockIdx >= 0 && blockIdx < indexes.size())
+                reader.seekToPosition(indexEntry.position + indexes.get(blockIdx).offset);
 
             currentIndexIdx = blockIdx;
-            openMarker = blockIdx > 0 ? indexes.get(blockIdx - 1).endOpenMarker : null;
-            mark = file.mark();
+            reader.openMarker = blockIdx > 0 ? indexes.get(blockIdx - 1).endOpenMarker : null;
+            mark = reader.file.mark();
+        }
+
+        public int blocksCount()
+        {
+            return indexes.size();
+        }
+
+        // Check if we've crossed an index boundary (based on the mark on the beginning of the index block).
+        public boolean isPastCurrentBlock()
+        {
+            return currentIndexIdx < indexes.size() && reader.file.bytesPastMark(mark) >= currentIndex().width;
+        }
+
+        public int currentBlockIdx()
+        {
+            return currentIndexIdx;
         }
 
         public IndexHelper.IndexInfo currentIndex()
@@ -416,9 +469,16 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
             return indexes.get(currentIndexIdx);
         }
 
-        public IndexHelper.IndexInfo previousIndex()
+        // Finds the index of the first block containing the provided bound, starting at the current index.
+        // Will be -1 if the bound is before any block, and blocksCount() if it is after every block.
+        public int findBlockIndex(Slice.Bound bound)
         {
-            return currentIndexIdx <= 1 ? null : indexes.get(currentIndexIdx - 1);
+            if (bound == Slice.Bound.BOTTOM)
+                return -1;
+            if (bound == Slice.Bound.TOP)
+                return blocksCount();
+
+            return IndexHelper.indexFor(bound, indexes, comparator, reversed, currentIndexIdx);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index 4fd5205..a58ea3e 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -18,30 +18,19 @@
 package org.apache.cassandra.db.columniterator;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import com.google.common.collect.AbstractIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.NoSuchElementException;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  *  A Cell Iterator over SSTable
  */
 public class SSTableIterator extends AbstractSSTableIterator
 {
-    private static final Logger logger = LoggerFactory.getLogger(SSTableIterator.class);
-
     public SSTableIterator(SSTableReader sstable, DecoratedKey key, ColumnFilter columns, boolean isForThrift)
     {
         this(sstable, null, key, sstable.getPosition(key, SSTableReader.Operator.EQ), columns, isForThrift);
@@ -71,222 +60,215 @@ public class SSTableIterator extends AbstractSSTableIterator
 
     private class ForwardReader extends Reader
     {
+        // The start of the current slice. This will be null as soon as we know we've passed that bound.
+        protected Slice.Bound start;
+        // The end of the current slice. Will never be null.
+        protected Slice.Bound end = Slice.Bound.TOP;
+
+        protected Unfiltered next; // the next element to return: this is computed by hasNextInternal().
+
+        protected boolean sliceDone; // set to true once we know we have no more result for the slice. This is in particular
+                                     // used by the indexed reader when we know we can't have results based on the index.
+
         private ForwardReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
         {
-            super(file, shouldCloseFile);
-            assert isAtPartitionStart;
+            super(file, isAtPartitionStart, shouldCloseFile);
         }
 
-        public boolean hasNext() throws IOException
+        protected void init() throws IOException
         {
-            assert deserializer != null;
-            return deserializer.hasNext();
+            // We should always have been initialized (at the beginning of the partition). Only indexed readers may
+            // have to initialize.
+            throw new IllegalStateException();
         }
 
-        public Unfiltered next() throws IOException
+        public void setForSlice(Slice slice) throws IOException
         {
-            return deserializer.readNext();
+            start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
+            end = slice.end();
+
+            sliceDone = false;
+            next = null;
         }
 
-        public Iterator<Unfiltered> slice(final Slice slice) throws IOException
+        // Skip all data that comes before the currently set slice.
+        // Return what should be returned at the end of this, or null if nothing should.
+        private Unfiltered handlePreSliceData() throws IOException
         {
-            return new AbstractIterator<Unfiltered>()
+            // Note that the following comparison is not strict. The reason is that the only cases
+            // where it can be == is if the "next" is a RT start marker (either a '[' of a ')[' boundary),
+            // and if we had a strict inequality and an open RT marker before this, we would issue
+            // the open marker first, and then return then next later, which would send in the
+            // stream both '[' (or '(') and then ')[' for the same clustering value, which is wrong.
+            // By using a non-strict inequality, we avoid that problem (if we do get ')[' for the same
+            // clustering value than the slice, we'll simply record it in 'openMarker').
+            while (deserializer.hasNext() && deserializer.compareNextTo(start) <= 0)
             {
-                private boolean beforeStart = true;
+                if (deserializer.nextIsRow())
+                    deserializer.skipNext();
+                else
+                    updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
+            }
+
+            Slice.Bound sliceStart = start;
+            start = null;
+
+            // We've reached the beginning of our queried slice. If we have an open marker
+            // we should return that first.
+            if (openMarker != null)
+                return new RangeTombstoneBoundMarker(sliceStart, openMarker);
 
-                protected Unfiltered computeNext()
+            return null;
+        }
+
+        // Compute the next element to return, assuming we're in the middle to the slice
+        // and the next element is either in the slice, or just after it. Returns null
+        // if we're done with the slice.
+        protected Unfiltered computeNext() throws IOException
+        {
+            if (!deserializer.hasNext() || deserializer.compareNextTo(end) > 0)
+                return null;
+
+            Unfiltered next = deserializer.readNext();
+            if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+                updateOpenMarker((RangeTombstoneMarker)next);
+            return next;
+        }
+
+        protected boolean hasNextInternal() throws IOException
+        {
+            if (next != null)
+                return true;
+
+            if (sliceDone)
+                return false;
+
+            assert deserializer != null;
+
+            if (start != null)
+            {
+                Unfiltered unfiltered = handlePreSliceData();
+                if (unfiltered != null)
                 {
-                    try
-                    {
-                        // While we're before the start of the slice, we can skip row but we should keep
-                        // track of open range tombstones
-                        if (beforeStart)
-                        {
-                            // Note that the following comparison is not strict. The reason is that the only cases
-                            // where it can be == is if the "next" is a RT start marker (either a '[' of a ')[' boundary),
-                            // and if we had a strict inequality and an open RT marker before this, we would issue
-                            // the open marker first, and then return then next later, which would yet in the
-                            // stream both '[' (or '(') and then ')[' for the same clustering value, which is wrong.
-                            // By using a non-strict inequality, we avoid that problem (if we do get ')[' for the same
-                            // clustering value than the slice, we'll simply record it in 'openMarker').
-                            while (deserializer.hasNext() && deserializer.compareNextTo(slice.start()) <= 0)
-                            {
-                                if (deserializer.nextIsRow())
-                                    deserializer.skipNext();
-                                else
-                                    updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
-                            }
-
-                            beforeStart = false;
-
-                            // We've reached the beginning of our queried slice. If we have an open marker
-                            // we should return that first.
-                            if (openMarker != null)
-                                return new RangeTombstoneBoundMarker(slice.start(), openMarker);
-                        }
-
-                        if (deserializer.hasNext() && deserializer.compareNextTo(slice.end()) <= 0)
-                        {
-                            Unfiltered next = deserializer.readNext();
-                            if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
-                                updateOpenMarker((RangeTombstoneMarker)next);
-                            return next;
-                        }
-
-                        // If we have an open marker, we should close it before finishing
-                        if (openMarker != null)
-                            return new RangeTombstoneBoundMarker(slice.end(), getAndClearOpenMarker());
-
-                        return endOfData();
-                    }
-                    catch (IOException e)
-                    {
-                        try
-                        {
-                            close();
-                        }
-                        catch (IOException suppressed)
-                        {
-                            e.addSuppressed(suppressed);
-                        }
-                        sstable.markSuspect();
-                        throw new CorruptSSTableException(e, file.getPath());
-                    }
+                    next = unfiltered;
+                    return true;
                 }
-            };
+            }
+
+            next = computeNext();
+            if (next != null)
+                return true;
+
+            // If we have an open marker, we should close it before finishing
+            if (openMarker != null)
+            {
+                next = new RangeTombstoneBoundMarker(end, getAndClearOpenMarker());
+                return true;
+            }
+
+            sliceDone = true; // not absolutely necessary but accurate and cheap
+            return false;
+        }
+
+        protected Unfiltered nextInternal() throws IOException
+        {
+            if (!hasNextInternal())
+                throw new NoSuchElementException();
+
+            Unfiltered toReturn = next;
+            next = null;
+            return toReturn;
         }
     }
 
-    private class ForwardIndexedReader extends IndexedReader
+    private class ForwardIndexedReader extends ForwardReader
     {
+        private final IndexState indexState;
+
+        private int lastBlockIdx; // the last index block that has data for the current query
+
         private ForwardIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
         {
-            super(file, shouldCloseFile, indexEntry, isAtPartitionStart);
+            super(file, isAtPartitionStart, shouldCloseFile);
+            this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, false);
+            this.lastBlockIdx = indexState.blocksCount(); // if we never call setForSlice, that's where we want to stop
         }
 
-        public boolean hasNext() throws IOException
+        @Override
+        protected void init() throws IOException
         {
-            // If it's called before we've created the file, create it. This then mean
-            // we're reading from the beginning of the partition.
-            if (!isInit)
-            {
-                seekToPosition(indexEntry.position);
-                ByteBufferUtil.skipShortLength(file); // partition key
-                DeletionTime.serializer.skip(file);   // partition deletion
-                if (sstable.header.hasStatic())
-                    UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper);
-                isInit = true;
-            }
-            return deserializer.hasNext();
+            // If this is called, it means we're calling hasNext() before any call to setForSlice. Which means
+            // we're reading everything from the beginning. So just set us up at the beginning of the first block.
+            indexState.setToBlock(0);
         }
 
-        public Unfiltered next() throws IOException
+        @Override
+        public void setForSlice(Slice slice) throws IOException
         {
-            return deserializer.readNext();
-        }
+            super.setForSlice(slice);
 
-        public Iterator<Unfiltered> slice(final Slice slice) throws IOException
-        {
-            final List<IndexHelper.IndexInfo> indexes = indexEntry.columnsIndex();
+            isInit = true;
 
             // if our previous slicing already got us the biggest row in the sstable, we're done
-            if (currentIndexIdx >= indexes.size())
-                return Collections.emptyIterator();
+            if (indexState.isDone())
+            {
+                sliceDone = true;
+                return;
+            }
 
             // Find the first index block we'll need to read for the slice.
-            final int startIdx = IndexHelper.indexFor(slice.start(), indexes, sstable.metadata.comparator, false, currentIndexIdx);
-            if (startIdx >= indexes.size())
-                return Collections.emptyIterator();
+            int startIdx = indexState.findBlockIndex(slice.start());
+            if (startIdx >= indexState.blocksCount())
+            {
+                sliceDone = true;
+                return;
+            }
 
             // If that's the last block we were reading, we're already where we want to be. Otherwise,
             // seek to that first block
-            if (startIdx != currentIndexIdx)
-                updateBlock(startIdx);
+            if (startIdx != indexState.currentBlockIdx())
+                indexState.setToBlock(startIdx);
 
             // Find the last index block we'll need to read for the slice.
-            final int endIdx = IndexHelper.indexFor(slice.end(), indexes, sstable.metadata.comparator, false, startIdx);
-
-            final IndexHelper.IndexInfo startIndex = currentIndex();
+            lastBlockIdx = indexState.findBlockIndex(slice.end());
 
             // The index search is based on the last name of the index blocks, so at that point we have that:
-            //   1) indexes[startIdx - 1].lastName < slice.start <= indexes[startIdx].lastName
-            //   2) indexes[endIdx - 1].lastName < slice.end <= indexes[endIdx].lastName
-            // so if startIdx == endIdx and slice.end < indexes[startIdx].firstName, we're guaranteed that the
-            // whole slice is between the previous block end and this bloc start, and thus has no corresponding
+            //   1) indexes[currentIdx - 1].lastName < slice.start <= indexes[currentIdx].lastName
+            //   2) indexes[lastBlockIdx - 1].lastName < slice.end <= indexes[lastBlockIdx].lastName
+            // so if currentIdx == lastBlockIdx and slice.end < indexes[currentIdx].firstName, we're guaranteed that the
+            // whole slice is between the previous block end and this block start, and thus has no corresponding
             // data. One exception is if the previous block ends with an openMarker as it will cover our slice
             // and we need to return it.
-            if (startIdx == endIdx && metadata().comparator.compare(slice.end(), startIndex.firstName) < 0 && openMarker == null && sstable.descriptor.version.storeRows())
-                return Collections.emptyIterator();
-
-            return new AbstractIterator<Unfiltered>()
+            if (indexState.currentBlockIdx() == lastBlockIdx
+                && metadata().comparator.compare(slice.end(), indexState.currentIndex().firstName) < 0
+                && openMarker == null
+                && sstable.descriptor.version.storeRows())
             {
-                private boolean beforeStart = true;
-                private int currentIndexIdx = startIdx;
+                sliceDone = true;
+            }
+        }
 
-                protected Unfiltered computeNext()
-                {
-                    try
-                    {
-                        // While we're before the start of the slice, we can skip row but we should keep
-                        // track of open range tombstones
-                        if (beforeStart)
-                        {
-                            // See ForwardReader equivalent method to see why this inequality is not strict.
-                            while (deserializer.hasNext() && deserializer.compareNextTo(slice.start()) <= 0)
-                            {
-                                if (deserializer.nextIsRow())
-                                    deserializer.skipNext();
-                                else
-                                    updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
-                            }
-
-                            beforeStart = false;
-
-                            // We've reached the beginning of our queried slice. If we have an open marker
-                            // we should return that first.
-                            if (openMarker != null)
-                                return new RangeTombstoneBoundMarker(slice.start(), openMarker);
-                        }
-
-                        // If we've crossed an index block boundary, update our informations
-                        if (currentIndexIdx < indexes.size() && file.bytesPastMark(mark) >= currentIndex().width)
-                            updateBlock(++currentIndexIdx);
-
-                        // Return the next atom unless we've reached the end, or we're beyond our slice
-                        // end (note that unless we're on the last block for the slice, there is no point
-                        // in checking the slice end).
-                        if (currentIndexIdx < indexes.size()
-                            && currentIndexIdx <= endIdx
-                            && deserializer.hasNext()
-                            && (currentIndexIdx != endIdx || deserializer.compareNextTo(slice.end()) <= 0))
-                        {
-                            Unfiltered next = deserializer.readNext();
-                            if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
-                                updateOpenMarker((RangeTombstoneMarker)next);
-                            return next;
-                        }
-
-                        // If we have an open marker, we should close it before finishing
-                        if (openMarker != null)
-                            return new RangeTombstoneBoundMarker(slice.end(), getAndClearOpenMarker());
-
-                        return endOfData();
-                    }
-                    catch (IOException e)
-                    {
-                        try
-                        {
-                            close();
-                        }
-                        catch (IOException suppressed)
-                        {
-                            e.addSuppressed(suppressed);
-                        }
-                        sstable.markSuspect();
-                        throw new CorruptSSTableException(e, file.getPath());
-                    }
-                }
-            };
+        @Override
+        protected Unfiltered computeNext() throws IOException
+        {
+            // Our previous read might have made us cross an index block boundary. If so, update our informations.
+            if (indexState.isPastCurrentBlock())
+                indexState.setToBlock(indexState.currentBlockIdx() + 1);
+
+            // Return the next unfiltered unless we've reached the end, or we're beyond our slice
+            // end (note that unless we're on the last block for the slice, there is no point
+            // in checking the slice end).
+            if (indexState.isDone()
+                || indexState.currentBlockIdx() > lastBlockIdx
+                || !deserializer.hasNext()
+                || (indexState.currentBlockIdx() == lastBlockIdx && deserializer.compareNextTo(end) > 0))
+                return null;
+
+
+            Unfiltered next = deserializer.readNext();
+            if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+                updateOpenMarker((RangeTombstoneMarker)next);
+            return next;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 0e18d4a..e15d330 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -20,29 +20,19 @@ package org.apache.cassandra.db.columniterator;
 import java.io.IOException;
 import java.util.*;
 
-import com.google.common.collect.AbstractIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.AbstractPartitionData;
+import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  *  A Cell Iterator in reversed clustering order over SSTable
  */
 public class SSTableReversedIterator extends AbstractSSTableIterator
 {
-    private static final Logger logger = LoggerFactory.getLogger(SSTableReversedIterator.class);
-
     public SSTableReversedIterator(SSTableReader sstable, DecoratedKey key, ColumnFilter columns, boolean isForThrift)
     {
         this(sstable, null, key, sstable.getPosition(key, SSTableReader.Operator.EQ), columns, isForThrift);
@@ -70,319 +60,290 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
         return true;
     }
 
-    private ReusablePartitionData createBuffer(int blocksCount)
+    private class ReverseReader extends Reader
     {
-        int estimatedRowCount = 16;
-        int columnCount = metadata().partitionColumns().regulars.columnCount();
-        if (columnCount == 0 || metadata().clusteringColumns().size() == 0)
+        protected ReusablePartitionData buffer;
+        protected Iterator<Unfiltered> iterator;
+
+        private ReverseReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
         {
-            estimatedRowCount = 1;
+            super(file, isAtPartitionStart, shouldCloseFile);
         }
-        else
+
+        protected ReusablePartitionData createBuffer(int blocksCount)
         {
-            try
+            int estimatedRowCount = 16;
+            int columnCount = metadata().partitionColumns().regulars.columnCount();
+            if (columnCount == 0 || metadata().clusteringColumns().isEmpty())
             {
-                // To avoid wasted resizing we guess-estimate the number of rows we're likely to read. For that
-                // we use the stats on the number of rows per partition for that sstable.
-                // FIXME: so far we only keep stats on cells, so to get a rough estimate on the number of rows,
-                // we divide by the number of regular columns the table has. We should fix once we collect the
-                // stats on rows
-                int estimatedRowsPerPartition = (int)(sstable.getEstimatedColumnCount().percentile(0.75) / columnCount);
-                estimatedRowCount = Math.max(estimatedRowsPerPartition / blocksCount, 1);
+                estimatedRowCount = 1;
             }
-            catch (IllegalStateException e)
+            else
             {
-                // The EstimatedHistogram mean() method can throw this (if it overflows). While such overflow
-                // shouldn't happen, it's not worth taking the risk of letting the exception bubble up.
+                try
+                {
+                    // To avoid wasted resizing we guess-estimate the number of rows we're likely to read. For that
+                    // we use the stats on the number of rows per partition for that sstable.
+                    // FIXME: so far we only keep stats on cells, so to get a rough estimate on the number of rows,
+                    // we divide by the number of regular columns the table has. We should fix once we collect the
+                    // stats on rows
+                    int estimatedRowsPerPartition = (int)(sstable.getEstimatedColumnCount().percentile(0.75) / columnCount);
+                    estimatedRowCount = Math.max(estimatedRowsPerPartition / blocksCount, 1);
+                }
+                catch (IllegalStateException e)
+                {
+                    // The EstimatedHistogram mean() method can throw this (if it overflows). While such overflow
+                    // shouldn't happen, it's not worth taking the risk of letting the exception bubble up.
+                }
             }
+            return new ReusablePartitionData(metadata(), partitionKey(), columns(), estimatedRowCount);
         }
-        return new ReusablePartitionData(metadata(), partitionKey(), DeletionTime.LIVE, columns(), estimatedRowCount);
-    }
-
-    private class ReverseReader extends Reader
-    {
-        private ReusablePartitionData partition;
-        private UnfilteredRowIterator iterator;
 
-        private ReverseReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
+        protected void init() throws IOException
         {
-            super(file, shouldCloseFile);
-            assert isAtPartitionStart;
+            // We should always have been initialized (at the beginning of the partition). Only indexed readers may
+            // have to initialize.
+            throw new IllegalStateException();
         }
 
-        public boolean hasNext() throws IOException
+        public void setForSlice(Slice slice) throws IOException
         {
-            if (partition == null)
+            // If we have read the data, just create the iterator for the slice. Otherwise, read the data.
+            if (buffer == null)
             {
-                partition = createBuffer(1);
-                partition.populateFrom(this, null, null, new Tester()
-                {
-                    public boolean isDone()
-                    {
-                        return false;
-                    }
-                });
-                iterator = partition.unfilteredIterator(columns, Slices.ALL, true);
+                buffer = createBuffer(1);
+                // Note that we can reuse that buffer between slices (we could alternatively re-read from disk
+                // every time, but that feels more wasteful) so we want to include everything from the beginning.
+                // We can stop at the slice end however since any following slice will be before that.
+                loadFromDisk(null, slice.end());
             }
+            setIterator(slice);
+        }
+
+        protected void setIterator(Slice slice)
+        {
+            assert buffer != null;
+            iterator = buffer.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
+        }
+
+        protected boolean hasNextInternal() throws IOException
+        {
+            // If we've never called setForSlice, we're reading everything
+            if (iterator == null)
+                setForSlice(Slice.ALL);
+
             return iterator.hasNext();
         }
 
-        public Unfiltered next() throws IOException
+        protected Unfiltered nextInternal() throws IOException
         {
             if (!hasNext())
                 throw new NoSuchElementException();
             return iterator.next();
         }
 
-        public Iterator<Unfiltered> slice(final Slice slice) throws IOException
+        protected boolean stopReadingDisk()
         {
-            if (partition == null)
+            return false;
+        }
+
+        // Reads the unfiltered from disk and load them into the reader buffer. It stops reading when either the partition
+        // is fully read, or when stopReadingDisk() returns true.
+        protected void loadFromDisk(Slice.Bound start, Slice.Bound end) throws IOException
+        {
+            buffer.reset();
+
+            // If the start might be in this block, skip everything that comes before it.
+            if (start != null)
             {
-                partition = createBuffer(1);
-                partition.populateFrom(this, slice.start(), slice.end(), new Tester()
+                while (deserializer.hasNext() && deserializer.compareNextTo(start) <= 0 && !stopReadingDisk())
                 {
-                    public boolean isDone()
-                    {
-                        return false;
-                    }
-                });
+                    if (deserializer.nextIsRow())
+                        deserializer.skipNext();
+                    else
+                        updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
+                }
             }
 
-            return partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
-        }
-    }
-
-    private class ReverseIndexedReader extends IndexedReader
-    {
-        private ReusablePartitionData partition;
-        private UnfilteredRowIterator iterator;
-
-        private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
-        {
-            super(file, shouldCloseFile, indexEntry, isAtPartitionStart);
-            this.currentIndexIdx = indexEntry.columnsIndex().size();
-        }
+            // If we have an open marker, it's either one from what we just skipped (if start != null), or it's from the previous index block.
+            if (openMarker != null)
+            {
+                RangeTombstone.Bound markerStart = start == null ? RangeTombstone.Bound.BOTTOM : RangeTombstone.Bound.fromSliceBound(start);
+                buffer.add(new RangeTombstoneBoundMarker(markerStart, openMarker));
+            }
 
-        public boolean hasNext() throws IOException
-        {
-            // If it's called before we've created the file, create it. This then mean
-            // we're reading from the end of the partition.
-            if (!isInit)
+            // Now deserialize everything until we reach our requested end (if we have one)
+            while (deserializer.hasNext()
+                   && (end == null || deserializer.compareNextTo(end) <= 0)
+                   && !stopReadingDisk())
             {
-                seekToPosition(indexEntry.position);
-                ByteBufferUtil.skipShortLength(file); // partition key
-                DeletionTime.serializer.skip(file);   // partition deletion
-                if (sstable.header.hasStatic())
-                    UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper);
-                isInit = true;
+                Unfiltered unfiltered = deserializer.readNext();
+                buffer.add(unfiltered);
+
+                if (unfiltered.isRangeTombstoneMarker())
+                    updateOpenMarker((RangeTombstoneMarker)unfiltered);
             }
 
-            if (partition == null)
+            // If we have an open marker, we should close it before finishing
+            if (openMarker != null)
             {
-                partition = createBuffer(indexes.size());
-                partition.populateFrom(this, null, null, new Tester()
-                {
-                    public boolean isDone()
-                    {
-                        return false;
-                    }
-                });
-                iterator = partition.unfilteredIterator(columns, Slices.ALL, true);
+                // If we have no end and still an openMarker, this means we're indexed and the marker is closed in a following block.
+                RangeTombstone.Bound markerEnd = end == null ? RangeTombstone.Bound.TOP : RangeTombstone.Bound.fromSliceBound(end);
+                buffer.add(new RangeTombstoneBoundMarker(markerEnd, getAndClearOpenMarker()));
             }
 
-            return iterator.hasNext();
+            buffer.build();
         }
+    }
+
+    private class ReverseIndexedReader extends ReverseReader
+    {
+        private final IndexState indexState;
 
-        public Unfiltered next() throws IOException
+        // The slice we're currently iterating over
+        private Slice slice;
+        // The last index block to consider for the slice
+        private int lastBlockIdx;
+
+        private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
         {
-            if (!hasNext())
-                throw new NoSuchElementException();
-            return iterator.next();
+            super(file, isAtPartitionStart, shouldCloseFile);
+            this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, true);
         }
 
-        private void prepareBlock(int blockIdx, Slice.Bound start, Slice.Bound end) throws IOException
+        protected void init() throws IOException
         {
-            updateBlock(blockIdx);
-
-            if (partition == null)
-                partition = createBuffer(indexes.size());
-            else
-                partition.clear();
-
-            final FileMark fileMark = mark;
-            final long width = currentIndex().width;
-
-            partition.populateFrom(this, start, end, new Tester()
-            {
-                public boolean isDone()
-                {
-                    return file.bytesPastMark(fileMark) >= width;
-                }
-            });
+            // If this is called, it means we're calling hasNext() before any call to setForSlice. Which means
+            // we're reading everything from the end. So just set us up on the last block.
+            indexState.setToBlock(indexState.blocksCount() - 1);
         }
 
         @Override
-        public Iterator<Unfiltered> slice(final Slice slice) throws IOException
+        public void setForSlice(Slice slice) throws IOException
         {
-            // if our previous slicing already got us the smallest row in the sstable, we're done
-            if (currentIndexIdx < 0)
-                return Collections.emptyIterator();
+            this.slice = slice;
+            isInit = true;
 
-            final List<IndexHelper.IndexInfo> indexes = indexEntry.columnsIndex();
+            // if our previous slicing already got us pas the beginning of the sstable, we're done
+            if (indexState.isDone())
+            {
+                iterator = Collections.emptyIterator();
+                return;
+            }
 
             // Find the first index block we'll need to read for the slice.
-            final int startIdx = IndexHelper.indexFor(slice.end(), indexes, sstable.metadata.comparator, true, currentIndexIdx);
+            int startIdx = indexState.findBlockIndex(slice.end());
             if (startIdx < 0)
-                return Collections.emptyIterator();
+            {
+                iterator = Collections.emptyIterator();
+                return;
+            }
 
-            // Find the last index block we'll need to read for the slice.
-            int lastIdx = IndexHelper.indexFor(slice.start(), indexes, sstable.metadata.comparator, true, startIdx);
+            boolean isCurrentBlock = startIdx == indexState.currentBlockIdx();
+            if (!isCurrentBlock)
+                indexState.setToBlock(startIdx);
 
-            // The index search is by firstname and so lastIdx is such that
-            //   indexes[lastIdx].firstName < slice.start <= indexes[lastIdx + 1].firstName
-            // However, if indexes[lastIdx].lastName < slice.start we can bump lastIdx.
-            if (lastIdx >= 0 && metadata().comparator.compare(indexes.get(lastIdx).lastName, slice.start()) < 0)
-                ++lastIdx;
+            lastBlockIdx = indexState.findBlockIndex(slice.start());
 
-            final int endIdx = lastIdx;
+            if (!isCurrentBlock)
+                readCurrentBlock(true);
 
-            // Because we're reversed, even if it is our current block, we should re-prepare the block since we would
-            // have skipped anything not in the previous slice.
-            prepareBlock(startIdx, slice.start(), slice.end());
+            setIterator(slice);
+        }
 
-            return new AbstractIterator<Unfiltered>()
-            {
-                private Iterator<Unfiltered> currentBlockIterator = partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
+        @Override
+        protected boolean hasNextInternal() throws IOException
+        {
+            if (super.hasNextInternal())
+                return true;
+
+            // We have nothing more for our current block, move the previous one.
+            int previousBlockIdx = indexState.currentBlockIdx() - 1;
+            if (previousBlockIdx < 0 || previousBlockIdx < lastBlockIdx)
+                return false;
+
+            // The slice start can be in 
+            indexState.setToBlock(previousBlockIdx);
+            readCurrentBlock(false);
+            setIterator(slice);
+            // since that new block is within the bounds we've computed in setToSlice(), we know there will
+            // always be something matching the slice unless we're on the lastBlockIdx (in which case there
+            // may or may not be results, but if there isn't, we're done for the slice).
+            return iterator.hasNext();
+        }
 
-                protected Unfiltered computeNext()
-                {
-                    try
-                    {
-                        if (currentBlockIterator.hasNext())
-                            return currentBlockIterator.next();
-
-                        --currentIndexIdx;
-                        if (currentIndexIdx < 0 || currentIndexIdx < endIdx)
-                            return endOfData();
-
-                        // Note that since we know we're read blocks backward, there is no point in checking the slice end, so we pass null
-                        prepareBlock(currentIndexIdx, slice.start(), null);
-                        currentBlockIterator = partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
-                        return computeNext();
-                    }
-                    catch (IOException e)
-                    {
-                        try
-                        {
-                            close();
-                        }
-                        catch (IOException suppressed)
-                        {
-                            e.addSuppressed(suppressed);
-                        }
-                        sstable.markSuspect();
-                        throw new CorruptSSTableException(e, file.getPath());
-                    }
-                }
-            };
+        /**
+         * Reads the current block, the last one we've set.
+         *
+         * @param canIncludeSliceEnd whether the block can include the slice end.
+         */
+        private void readCurrentBlock(boolean canIncludeSliceEnd) throws IOException
+        {
+            if (buffer == null)
+                buffer = createBuffer(indexState.blocksCount());
+
+            boolean canIncludeSliceStart = indexState.currentBlockIdx() == lastBlockIdx;
+            loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null);
         }
-    }
 
-    private abstract class Tester
-    {
-        public abstract boolean isDone();
+        @Override
+        protected boolean stopReadingDisk()
+        {
+            return indexState.isPastCurrentBlock();
+        }
     }
 
-    private class ReusablePartitionData extends AbstractPartitionData
+    private class ReusablePartitionData extends AbstractThreadUnsafePartition
     {
-        private final Writer rowWriter;
-        private final RangeTombstoneCollector markerWriter;
+        private MutableDeletionInfo.Builder deletionBuilder;
+        private MutableDeletionInfo deletionInfo;
 
         private ReusablePartitionData(CFMetaData metadata,
                                       DecoratedKey partitionKey,
-                                      DeletionTime deletionTime,
                                       PartitionColumns columns,
                                       int initialRowCapacity)
         {
-            super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, false);
+            super(metadata, partitionKey, columns, new ArrayList<>(initialRowCapacity));
+        }
 
-            this.rowWriter = new Writer(true);
-            // Note that even though the iterator handles the reverse case, this object holds the data for a single index bock, and we read index blocks in
-            // forward clustering order.
-            this.markerWriter = new RangeTombstoneCollector(false);
+        public DeletionInfo deletionInfo()
+        {
+            return deletionInfo;
         }
 
-        // Note that this method is here rather than in the readers because we want to use it for both readers and they
-        // don't extend one another
-        private void populateFrom(Reader reader, Slice.Bound start, Slice.Bound end, Tester tester) throws IOException
+        protected boolean canHaveShadowedData()
         {
-            // If we have a start bound, skip everything that comes before it.
-            while (reader.deserializer.hasNext() && start != null && reader.deserializer.compareNextTo(start) <= 0 && !tester.isDone())
-            {
-                if (reader.deserializer.nextIsRow())
-                    reader.deserializer.skipNext();
-                else
-                    reader.updateOpenMarker((RangeTombstoneMarker)reader.deserializer.readNext());
-            }
+            return false;
+        }
 
-            // If we have an open marker, it's either one from what we just skipped (if start != null), or it's from the previous index block.
-            if (reader.openMarker != null)
-            {
-                // If we have no start but still an openMarker, this means we're indexed and it's coming from the previous block
-                Slice.Bound markerStart = start;
-                if (start == null)
-                {
-                    ClusteringPrefix c = ((IndexedReader)reader).previousIndex().lastName;
-                    markerStart = Slice.Bound.exclusiveStartOf(c);
-                }
-                writeMarker(markerStart, reader.openMarker);
-            }
+        public Row staticRow()
+        {
+            return Rows.EMPTY_STATIC_ROW; // we don't actually use that
+        }
 
-            // Now deserialize everything until we reach our requested end (if we have one)
-            while (reader.deserializer.hasNext()
-                   && (end == null || reader.deserializer.compareNextTo(end) <= 0)
-                   && !tester.isDone())
-            {
-                Unfiltered unfiltered = reader.deserializer.readNext();
-                if (unfiltered.kind() == Unfiltered.Kind.ROW)
-                {
-                    ((Row) unfiltered).copyTo(rowWriter);
-                }
-                else
-                {
-                    RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
-                    reader.updateOpenMarker(marker);
-                    marker.copyTo(markerWriter);
-                }
-            }
+        public RowStats stats()
+        {
+            return RowStats.NO_STATS; // we don't actually use that
+        }
 
-            // If we have an open marker, we should close it before finishing
-            if (reader.openMarker != null)
-            {
-                // If we no end and still an openMarker, this means we're indexed and the marker can be close using the blocks end
-                Slice.Bound markerEnd = end;
-                if (end == null)
-                {
-                    ClusteringPrefix c = ((IndexedReader)reader).currentIndex().lastName;
-                    markerEnd = Slice.Bound.inclusiveEndOf(c);
-                }
-                writeMarker(markerEnd, reader.getAndClearOpenMarker());
-            }
+        public void add(Unfiltered unfiltered)
+        {
+            if (unfiltered.isRow())
+                rows.add((Row)unfiltered);
+            else
+                deletionBuilder.add((RangeTombstoneMarker)unfiltered);
         }
 
-        private void writeMarker(Slice.Bound bound, DeletionTime dt)
+        public void reset()
         {
-            bound.writeTo(markerWriter);
-            markerWriter.writeBoundDeletion(dt);
-            markerWriter.endOfMarker();
+            rows.clear();
+            deletionBuilder = MutableDeletionInfo.builder(partitionLevelDeletion, metadata().comparator, false);
         }
 
-        @Override
-        public void clear()
+        public void build()
         {
-            super.clear();
-            rowWriter.reset();
-            markerWriter.reset();
+            deletionInfo = deletionBuilder.build();
+            deletionBuilder = null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index b3cb370..0149582 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.metrics.CompactionMetrics;
 
 /**
@@ -34,7 +33,7 @@ import org.apache.cassandra.metrics.CompactionMetrics;
  * <p>
  * On top of the actual merging the source iterators, this class:
  * <ul>
- *   <li>purge gc-able tombstones if possible (see PurgingPartitionIterator below).</li>
+ *   <li>purge gc-able tombstones if possible (see PurgeIterator below).</li>
  *   <li>update 2ndary indexes if necessary (as we don't read-before-write on index updates, index entries are
  *       not deleted on deletion of the base table data, which is ok because we'll fix index inconsistency
  *       on reads. This however mean that potentially obsolete index entries could be kept a long time for
@@ -65,12 +64,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
      */
     private final long[] mergeCounters;
 
-    private final UnfilteredPartitionIterator mergedIterator;
+    private final UnfilteredPartitionIterator compacted;
     private final CompactionMetrics metrics;
 
-    // The number of row/RT merged by the iterator
-    private int merged;
-
     public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
     {
         this(type, scanners, controller, nowInSec, compactionId, null);
@@ -96,9 +92,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
         if (metrics != null)
             metrics.beginCompaction(this);
 
-        this.mergedIterator = scanners.isEmpty()
-                            ? UnfilteredPartitionIterators.EMPTY
-                            : UnfilteredPartitionIterators.convertExpiredCellsToTombstones(new PurgingPartitionIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller), nowInSec);
+        this.compacted = scanners.isEmpty()
+                       ? UnfilteredPartitionIterators.EMPTY
+                       : new PurgeIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller);
     }
 
     public boolean isForThrift()
@@ -143,57 +139,46 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
 
                 CompactionIterator.this.updateCounterFor(merged);
 
-                /*
-                 * The row level listener does 2 things:
-                 *  - It updates 2ndary indexes for deleted/shadowed cells
-                 *  - It updates progress regularly (every UNFILTERED_TO_UPDATE_PROGRESS)
-                 */
-                final SecondaryIndexManager.Updater indexer = type == OperationType.COMPACTION
-                                                            ? controller.cfs.indexManager.gcUpdaterFor(partitionKey, nowInSec)
-                                                            : SecondaryIndexManager.nullUpdater;
+                if (type != OperationType.COMPACTION || !controller.cfs.indexManager.hasIndexes())
+                    return null;
 
-                return new UnfilteredRowIterators.MergeListener()
+                // If we have a 2ndary index, we must update it with deleted/shadowed cells.
+                // TODO: this should probably be done asynchronously and batched.
+                final SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.gcUpdaterFor(partitionKey, nowInSec);
+                final RowDiffListener diffListener = new RowDiffListener()
                 {
-                    private Clustering clustering;
+                    public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+                    {
+                    }
 
-                    public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+                    public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
                     {
                     }
 
-                    public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions)
+                    public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
                     {
-                        this.clustering = clustering;
                     }
 
-                    public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedCompositeDeletion, DeletionTime[] versions)
+                    public void onCell(int i, Clustering clustering, Cell merged, Cell original)
                     {
+                        if (original != null && (merged == null || !merged.isLive(nowInSec)))
+                            indexer.remove(clustering, original);
                     }
+                };
 
-                    public void onMergedCells(Cell mergedCell, Cell[] versions)
+                return new UnfilteredRowIterators.MergeListener()
+                {
+                    public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
                     {
-                        if (indexer == SecondaryIndexManager.nullUpdater)
-                            return;
-
-                        for (int i = 0; i < versions.length; i++)
-                        {
-                            Cell version = versions[i];
-                            if (version != null && (mergedCell == null || !mergedCell.equals(version)))
-                                indexer.remove(clustering, version);
-                        }
                     }
 
-                    public void onRowDone()
+                    public void onMergedRows(Row merged, Columns columns, Row[] versions)
                     {
-                        int merged = ++CompactionIterator.this.merged;
-                        if (merged % UNFILTERED_TO_UPDATE_PROGRESS == 0)
-                            updateBytesRead();
+                        Rows.diff(merged, columns, versions, diffListener);
                     }
 
                     public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)
                     {
-                        int merged = ++CompactionIterator.this.merged;
-                        if (merged % UNFILTERED_TO_UPDATE_PROGRESS == 0)
-                            updateBytesRead();
                     }
 
                     public void close()
@@ -218,12 +203,12 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
 
     public boolean hasNext()
     {
-        return mergedIterator.hasNext();
+        return compacted.hasNext();
     }
 
     public UnfilteredRowIterator next()
     {
-        return mergedIterator.next();
+        return compacted.next();
     }
 
     public void remove()
@@ -235,7 +220,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
     {
         try
         {
-            mergedIterator.close();
+            compacted.close();
         }
         finally
         {
@@ -249,7 +234,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
         return this.getCompactionInfo().toString();
     }
 
-    private class PurgingPartitionIterator extends TombstonePurgingPartitionIterator
+    private class PurgeIterator extends PurgingPartitionIterator
     {
         private final CompactionController controller;
 
@@ -257,28 +242,33 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
         private long maxPurgeableTimestamp;
         private boolean hasCalculatedMaxPurgeableTimestamp;
 
-        private PurgingPartitionIterator(UnfilteredPartitionIterator toPurge, CompactionController controller)
+        private long compactedUnfiltered;
+
+        private PurgeIterator(UnfilteredPartitionIterator toPurge, CompactionController controller)
         {
             super(toPurge, controller.gcBefore);
             this.controller = controller;
         }
 
         @Override
-        protected void onEmpty(DecoratedKey key)
+        protected void onEmptyPartitionPostPurge(DecoratedKey key)
         {
             if (type == OperationType.COMPACTION)
                 controller.cfs.invalidateCachedPartition(key);
         }
 
         @Override
-        protected boolean shouldFilter(UnfilteredRowIterator iterator)
+        protected void onNewPartition(DecoratedKey key)
         {
-            currentKey = iterator.partitionKey();
+            currentKey = key;
             hasCalculatedMaxPurgeableTimestamp = false;
+        }
 
-            // TODO: we could be able to skip filtering if UnfilteredRowIterator was giving us some stats
-            // (like the smallest local deletion time).
-            return true;
+        @Override
+        protected void updateProgress()
+        {
+            if ((++compactedUnfiltered) % UNFILTERED_TO_UPDATE_PROGRESS == 0)
+                updateBytesRead();
         }
 
         /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
index 29ea7fe..ed7584b 100644
--- a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
@@ -17,13 +17,13 @@
  */
 package org.apache.cassandra.db.filter;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFilter
@@ -68,7 +68,7 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi
             int i = 0;
             for (ColumnDefinition column : metadata.clusteringColumns())
                 sb.append(i++ == 0 ? "" : ", ").append(column.name).append(column.type instanceof ReversedType ? " ASC" : " DESC");
-            sb.append(")");
+            sb.append(')');
         }
     }
 
@@ -84,7 +84,7 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi
             filter.serializeInternal(out, version);
         }
 
-        public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+        public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
         {
             Kind kind = Kind.values()[in.readUnsignedByte()];
             boolean reversed = in.readBoolean();
@@ -104,6 +104,6 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi
 
     protected static abstract class InternalDeserializer
     {
-        public abstract ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException;
+        public abstract ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException;
     }
 }