You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/05/07 21:22:21 UTC

[2/2] git commit: Single-pass compaction patch by jbellis; reviewed by marcuse for CASSANDRA-4180

Single-pass compaction
patch by jbellis; reviewed by marcuse for CASSANDRA-4180


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9bd531b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9bd531b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9bd531b

Branch: refs/heads/trunk
Commit: a9bd531bc9a286cc8e81a800aaa29c1fead62dcd
Parents: 3ca160e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue May 7 14:20:53 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue May 7 14:21:23 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/db/Column.java       |   27 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    4 +-
 src/java/org/apache/cassandra/db/ColumnIndex.java  |   46 +++-
 src/java/org/apache/cassandra/db/OnDiskAtom.java   |    5 +-
 .../apache/cassandra/db/RowIteratorFactory.java    |    3 +-
 .../columniterator/ICountableColumnIterator.java   |   25 --
 .../db/columniterator/IndexedSliceReader.java      |   29 +-
 .../db/columniterator/SSTableNamesIterator.java    |   10 +-
 .../db/columniterator/SimpleSliceReader.java       |   32 +--
 .../db/compaction/AbstractCompactedRow.java        |   18 +-
 .../cassandra/db/compaction/CompactionManager.java |   53 ++--
 .../cassandra/db/compaction/CompactionTask.java    |    5 +-
 .../db/compaction/LazilyCompactedRow.java          |  111 ++------
 .../db/compaction/ParallelCompactionIterable.java  |   28 +--
 .../cassandra/db/compaction/PrecompactedRow.java   |   40 +---
 .../apache/cassandra/db/compaction/Scrubber.java   |   49 ++--
 .../apache/cassandra/io/sstable/Descriptor.java    |    3 +
 .../io/sstable/SSTableBoundedScanner.java          |   95 ------
 .../io/sstable/SSTableIdentityIterator.java        |   99 ++-----
 .../apache/cassandra/io/sstable/SSTableReader.java |   13 +-
 .../cassandra/io/sstable/SSTableScanner.java       |  184 +++++-------
 .../apache/cassandra/io/sstable/SSTableWriter.java |  134 ++++-----
 .../cassandra/streaming/IncomingStreamReader.java  |   33 +--
 .../org/apache/cassandra/tools/SSTableExport.java  |   54 ++--
 test/unit/org/apache/cassandra/SchemaLoader.java   |   21 +-
 test/unit/org/apache/cassandra/Util.java           |   18 --
 test/unit/org/apache/cassandra/db/ScrubTest.java   |  222 ---------------
 .../apache/cassandra/db/SerializationsTest.java    |    3 +-
 .../cassandra/db/compaction/CompactionsTest.java   |    3 +-
 .../cassandra/io/LazilyCompactedRowTest.java       |   44 ++--
 .../apache/cassandra/io/sstable/SSTableTest.java   |  112 --------
 .../cassandra/service/LeaveAndBootstrapTest.java   |    2 +-
 .../org/apache/cassandra/service/MoveTest.java     |    2 +-
 .../org/apache/cassandra/service/RemoveTest.java   |    2 +-
 35 files changed, 424 insertions(+), 1106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 60accb1..4cd7b6b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0
+ * Single-pass compaction (CASSANDRA-4180)
  * Removed token range bisection (CASSANDRA-5518)
  * Removed compatibility with pre-1.2.5 sstables and network messages
    (CASSANDRA-5511)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index bf4f5b3..a8c1f94 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -27,6 +27,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.collect.AbstractIterator;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -50,33 +52,34 @@ public class Column implements OnDiskAtom
         return OnDiskAtom.Serializer.instance;
     }
 
+    /**
+     * For 2.0-formatted sstables (where column count is not stored), @param count should be Integer.MAX_VALUE,
+     * and we will look for the end-of-row column name marker instead of relying on that.
+     */
     public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in, final int count, final ColumnSerializer.Flag flag, final int expireBefore, final Descriptor.Version version)
     {
-        return new Iterator<OnDiskAtom>()
+        return new AbstractIterator<OnDiskAtom>()
         {
             int i = 0;
 
-            public boolean hasNext()
+            protected OnDiskAtom computeNext()
             {
-                return i < count;
-            }
+                if (i++ >= count)
+                    return endOfData();
 
-            public OnDiskAtom next()
-            {
-                ++i;
+                OnDiskAtom atom;
                 try
                 {
-                    return onDiskSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
+                    atom = onDiskSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
                 }
                 catch (IOException e)
                 {
                     throw new IOError(e);
                 }
-            }
+                if (atom == null)
+                    return endOfData();
 
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
+                return atom;
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b07d97c..bbbb554 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1480,6 +1480,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, this);
 
+            // todo this could be pushed into SSTableScanner
             return new AbstractScanIterator()
             {
                 protected Row computeNext()
@@ -1498,7 +1499,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     if (!range.contains(key))
                         return computeNext();
 
-                    logger.trace("scanned {}", key);
+                    if (logger.isTraceEnabled())
+                        logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key));
 
                     return current;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 57bd995..bcc5c2f 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnIndex
 {
@@ -65,13 +66,21 @@ public class ColumnIndex
         private final DataOutput output;
         private final RangeTombstone.Tracker tombstoneTracker;
         private int atomCount;
+        private final ByteBuffer key;
+        private final DeletionInfo deletionInfo;
 
         public Builder(ColumnFamily cf,
                        ByteBuffer key,
                        DataOutput output,
                        boolean fromStream)
         {
-            this.indexOffset = rowHeaderSize(key, cf.deletionInfo());
+            assert cf != null;
+            assert key != null;
+            assert output != null;
+
+            this.key = key;
+            deletionInfo = cf.deletionInfo();
+            this.indexOffset = rowHeaderSize(key, deletionInfo);
             this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
             this.output = output;
             this.tombstoneTracker = fromStream ? null : new RangeTombstone.Tracker(cf.getComparator());
@@ -94,9 +103,7 @@ public class ColumnIndex
             // TODO fix constantSize when changing the nativeconststs.
             int keysize = key.remaining();
             return typeSizes.sizeof((short) keysize) + keysize          // Row key
-                 + typeSizes.sizeof(0L)                                 // Row data size
-                 + DeletionTime.serializer.serializedSize(delInfo.getTopLevelDeletion(), typeSizes)
-                 + typeSizes.sizeof(0);                                 // Column count
+                 + DeletionTime.serializer.serializedSize(delInfo.getTopLevelDeletion(), typeSizes);
         }
 
         public RangeTombstone.Tracker tombstoneTracker()
@@ -119,6 +126,7 @@ public class ColumnIndex
          */
         public ColumnIndex build(ColumnFamily cf) throws IOException
         {
+            // cf has disentangled the columns and range tombstones, we need to re-interleave them in comparator order
             Iterator<RangeTombstone> rangeIter = cf.deletionInfo().rangeIterator();
             RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
             Comparator<ByteBuffer> comparator = cf.getComparator();
@@ -138,15 +146,22 @@ public class ColumnIndex
                 add(tombstone);
                 tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
             }
-            return build();
+            ColumnIndex index = build();
+
+            finish();
+
+            return index;
         }
 
         public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
         {
             for (OnDiskAtom c : columns)
                 add(c);
+            ColumnIndex index = build();
+
+            finish();
 
-            return build();
+            return index;
         }
 
         public void add(OnDiskAtom column) throws IOException
@@ -177,8 +192,8 @@ public class ColumnIndex
                 lastBlockClosing = column;
             }
 
-            if (output != null)
-                atomSerializer.serializeForSSTable(column, output);
+            maybeWriteRowHeader();
+            atomSerializer.serializeForSSTable(column, output);
 
             // TODO: Should deal with removing unneeded tombstones
             if (tombstoneTracker != null)
@@ -187,6 +202,15 @@ public class ColumnIndex
             lastColumn = column;
         }
 
+        private void maybeWriteRowHeader() throws IOException
+        {
+            if (lastColumn == null)
+            {
+                ByteBufferUtil.writeWithShortLength(key, output);
+                DeletionInfo.serializer().serializeForSSTable(deletionInfo, output);
+            }
+        }
+
         public ColumnIndex build()
         {
             // all columns were GC'd after all
@@ -204,5 +228,11 @@ public class ColumnIndex
             assert result.columnsIndex.size() > 0;
             return result;
         }
+
+        public void finish() throws IOException
+        {
+            if (!deletionInfo.equals(DeletionInfo.LIVE))
+                maybeWriteRowHeader();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 7cf7914..2fdb7ad 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -73,7 +73,10 @@ public interface OnDiskAtom
         {
             ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
             if (name.remaining() <= 0)
-                throw ColumnSerializer.CorruptColumnException.create(in, name);
+            {
+                // SSTableWriter.END_OF_ROW
+                return null;
+            }
 
             int b = in.readUnsignedByte();
             if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 1bc506b..4588146 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -69,8 +69,7 @@ public class RowIteratorFactory
 
         for (SSTableReader sstable : sstables)
         {
-            final SSTableScanner scanner = sstable.getScanner(filter);
-            scanner.seekTo(startWith);
+            final SSTableScanner scanner = sstable.getScanner(filter, startWith);
             iterators.add(scanner);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java b/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
deleted file mode 100644
index 0b5fa97..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-
-public interface ICountableColumnIterator extends OnDiskAtomIterator
-{
-    public int getColumnCount();
-
-    public void reset();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index ffbca1a..ccf9149 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -74,16 +74,15 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
         {
             Descriptor.Version version = sstable.descriptor.version;
             this.indexes = indexEntry.columnsIndex();
+            emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
             if (indexes.isEmpty())
             {
-                setToRowStart(sstable, indexEntry, input);
-                this.emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
+                setToRowStart(indexEntry, input);
                 emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version));
                 fetcher = new SimpleBlockFetcher();
             }
             else
             {
-                emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
                 emptyColumnFamily.delete(indexEntry.deletionTime());
                 fetcher = new IndexedBlockFetcher(indexEntry.position);
             }
@@ -98,19 +97,20 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
     /**
      * Sets the seek position to the start of the row for column scanning.
      */
-    private void setToRowStart(SSTableReader reader, RowIndexEntry indexEntry, FileDataInput input) throws IOException
+    private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException
     {
-        if (input == null)
+        if (in == null)
         {
-            this.file = sstable.getFileDataInput(indexEntry.position);
+            this.file = sstable.getFileDataInput(rowEntry.position);
         }
         else
         {
-            this.file = input;
-            input.seek(indexEntry.position);
+            this.file = in;
+            in.seek(rowEntry.position);
         }
         sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
-        file.readLong();
+        if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+            file.readLong();
     }
 
     public ColumnFamily getColumnFamily()
@@ -210,7 +210,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
     private class IndexedBlockFetcher extends BlockFetcher
     {
         // where this row starts
-        private final long basePosition;
+        private final long columnsStart;
 
         // the index entry for the next block to deserialize
         private int nextIndexIdx = -1;
@@ -222,10 +222,10 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
         // may still match a slice
         private final Deque<OnDiskAtom> prefetched;
 
-        public IndexedBlockFetcher(long basePosition)
+        public IndexedBlockFetcher(long columnsStart)
         {
             super(-1);
-            this.basePosition = basePosition;
+            this.columnsStart = columnsStart;
             this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
             setNextSlice();
         }
@@ -330,7 +330,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
 
             /* seek to the correct offset to the data, and calculate the data size */
-            long positionToSeek = basePosition + currentIndex.offset;
+            long positionToSeek = columnsStart + currentIndex.offset;
 
             // With new promoted indexes, our first seek in the data file will happen at that point.
             if (file == null)
@@ -420,7 +420,8 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             // We remenber when we are whithin a slice to avoid some comparison
             boolean inSlice = false;
 
-            Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
+            int columnCount = sstable.descriptor.version.hasRowSizeAndColumnCount ? file.readInt() : Integer.MAX_VALUE;
+            Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
             while (atomIterator.hasNext())
             {
                 OnDiskAtom column = atomIterator.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index bc0af30..f0e6dc7 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -115,7 +115,8 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
 
             DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
             assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath());
-            file.readLong();
+            if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+                file.readLong();
         }
 
         indexList = indexEntry.columnsIndex();
@@ -142,7 +143,8 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
         List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
         if (indexList.isEmpty())
         {
-            readSimpleColumns(file, columns, result);
+            int columnCount = sstable.descriptor.version.hasRowSizeAndColumnCount ? file.readInt() : Integer.MAX_VALUE;
+            readSimpleColumns(file, columns, result, columnCount);
         }
         else
         {
@@ -153,9 +155,9 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
         iter = result.iterator();
     }
 
-    private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<OnDiskAtom> result) throws IOException
+    private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<OnDiskAtom> result, int columnCount) throws IOException
     {
-        Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
+        Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
         int n = 0;
         while (atomIterator.hasNext())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
index efbb92c..d0d74c6 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.Iterator;
 
 import com.google.common.collect.AbstractIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -30,21 +32,22 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
 {
+    private static final Logger logger = LoggerFactory.getLogger(SimpleSliceReader.class);
+
     private final FileDataInput file;
     private final boolean needsClosing;
     private final ByteBuffer finishColumn;
     private final AbstractType<?> comparator;
     private final ColumnFamily emptyColumnFamily;
-    private FileMark mark;
     private final Iterator<OnDiskAtom> atomIterator;
 
     public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ByteBuffer finishColumn)
     {
+        logger.debug("Slicing {}", sstable);
         this.finishColumn = finishColumn;
         this.comparator = sstable.metadata.comparator;
         try
@@ -61,16 +64,17 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
                 this.needsClosing = false;
             }
 
+            Descriptor.Version version = sstable.descriptor.version;
+
             // Skip key and data size
             ByteBufferUtil.skipShortLength(file);
-            file.readLong();
-
-            Descriptor.Version version = sstable.descriptor.version;
+            if (version.hasRowSizeAndColumnCount)
+                file.readLong();
 
             emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
-            emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version));
-            atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, file.readInt(), version);
-            mark = file.mark();
+            emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version));
+            int columnCount = version.hasRowSizeAndColumnCount ? file.readInt() : Integer.MAX_VALUE;
+            atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
         }
         catch (IOException e)
         {
@@ -84,20 +88,10 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
         if (!atomIterator.hasNext())
             return endOfData();
 
-        OnDiskAtom column;
-        try
-        {
-            file.reset(mark);
-            column = atomIterator.next();
-        }
-        catch (IOException e)
-        {
-            throw new CorruptSSTableException(e, file.getPath());
-        }
+        OnDiskAtom column = atomIterator.next();
         if (finishColumn.remaining() > 0 && comparator.compare(column.name(), finishColumn) > 0)
             return endOfData();
 
-        mark = file.mark();
         return column;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 56e9353..cd2952a 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.security.MessageDigest;
 
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.db.DeletionInfo;
 import org.apache.cassandra.db.ColumnIndex;
@@ -48,7 +49,7 @@ public abstract class AbstractCompactedRow implements Closeable
      *
      * write() may change internal state; it is NOT valid to call write() or update() a second time.
      */
-    public abstract long write(DataOutput out) throws IOException;
+    public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
 
     /**
      * update @param digest with the data bytes of the row (not including row key or row size).
@@ -59,23 +60,8 @@ public abstract class AbstractCompactedRow implements Closeable
     public abstract void update(MessageDigest digest);
 
     /**
-     * @return true if there are no columns in the row AND there are no row-level tombstones to be preserved
-     */
-    public abstract boolean isEmpty();
-
-    /**
      * @return aggregate information about the columns in this row.  Some fields may
      * contain default values if computing them value would require extra effort we're not willing to make.
      */
     public abstract ColumnStats columnStats();
-
-    /**
-     * @return the compacted row deletion infos.
-     */
-    public abstract DeletionInfo deletionInfo();
-
-    /**
-     * @return the column index for this row.
-     */
-    public abstract ColumnIndex index();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 877d349..758068c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -482,9 +482,6 @@ public class CompactionManager implements CompactionManagerMBean
             if (logger.isDebugEnabled())
                 logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
-            SSTableWriter writer = null;
-            SSTableReader newSstable = null;
-
             logger.info("Cleaning up " + sstable);
             // Calculate the expected compacted filesize
             long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable), OperationType.CLEANUP);
@@ -498,6 +495,11 @@ public class CompactionManager implements CompactionManagerMBean
 
             CleanupInfo ci = new CleanupInfo(sstable, scanner);
             metrics.beginCompaction(ci);
+            SSTableWriter writer = createWriter(cfs,
+                                                compactionFileLocation,
+                                                expectedBloomFilterSize,
+                                                sstable);
+            SSTableReader newSstable = null;
             try
             {
                 while (scanner.hasNext())
@@ -508,11 +510,8 @@ public class CompactionManager implements CompactionManagerMBean
                     if (Range.isInRanges(row.getKey().token, ranges))
                     {
                         AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                        if (compactedRow.isEmpty())
-                            continue;
-                        writer = maybeCreateWriter(cfs, OperationType.CLEANUP, compactionFileLocation, expectedBloomFilterSize, writer, sstable);
-                        writer.append(compactedRow);
-                        totalkeysWritten++;
+                        if (writer.append(compactedRow) != null)
+                            totalkeysWritten++;
                     }
                     else
                     {
@@ -553,13 +552,14 @@ public class CompactionManager implements CompactionManagerMBean
                         }
                     }
                 }
-                if (writer != null)
+                if (totalkeysWritten > 0)
                     newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+                else
+                    writer.abort();
             }
             catch (Throwable e)
             {
-                if (writer != null)
-                    writer.abort();
+                writer.abort();
                 throw Throwables.propagate(e);
             }
             finally
@@ -589,23 +589,17 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    public static SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs,
-                                                  OperationType compactionType,
-                                                  File compactionFileLocation,
-                                                  int expectedBloomFilterSize,
-                                                  SSTableWriter writer,
-                                                  SSTableReader sstable)
+    public static SSTableWriter createWriter(ColumnFamilyStore cfs,
+                                             File compactionFileLocation,
+                                             int expectedBloomFilterSize,
+                                             SSTableReader sstable)
     {
-        if (writer == null)
-        {
-            FileUtils.createDirectory(compactionFileLocation);
-            writer = new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
-                                       expectedBloomFilterSize,
-                                       cfs.metadata,
-                                       cfs.partitioner,
-                                       SSTableMetadata.createCollector(Collections.singleton(sstable), sstable.getSSTableLevel()));
-        }
-        return writer;
+        FileUtils.createDirectory(compactionFileLocation);
+        return new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
+                                 expectedBloomFilterSize,
+                                 cfs.metadata,
+                                 cfs.partitioner,
+                                 SSTableMetadata.createCollector(Collections.singleton(sstable), sstable.getSSTableLevel()));
     }
 
     /**
@@ -661,10 +655,7 @@ public class CompactionManager implements CompactionManagerMBean
                 if (ci.isStopRequested())
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
                 AbstractCompactedRow row = iter.next();
-                if (row.isEmpty())
-                    row.close();
-                else
-                    validator.add(row);
+                validator.add(row);
             }
             validator.complete();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 3b4fdc8..e32956b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -156,7 +156,8 @@ public class CompactionTask extends AbstractCompactionTask
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 
                 AbstractCompactedRow row = iter.next();
-                if (row.isEmpty())
+                RowIndexEntry indexEntry = writer.append(row);
+                if (indexEntry == null)
                 {
                     controller.invalidateCachedRow(row.key);
                     row.close();
@@ -166,8 +167,6 @@ public class CompactionTask extends AbstractCompactionTask
                 // If the row is cached, we call removeDeleted on at read time it to have coherent query returns,
                 // but if the row is not pushed out of the cache, obsolete tombstones will persist indefinitely.
                 controller.removeDeletedInCache(row.key);
-
-                RowIndexEntry indexEntry = writer.append(row);
                 totalkeysWritten++;
 
                 if (DatabaseDescriptor.getPreheatKeyCache())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index b799f0d..444ee11 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -29,13 +29,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.StreamingHistogram;
 
@@ -45,35 +46,30 @@ import org.apache.cassandra.utils.StreamingHistogram;
  * of the rows being compacted, and merging them as it does so.  So the most we have
  * in memory at a time is the bloom filter, the index, and one column from each
  * pre-compaction row.
- *
- * When write() or update() is called, a second pass is made over the pre-compaction
- * rows to write the merged columns or update the hash, again with at most one column
- * from each row deserialized at a time.
  */
 public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable<OnDiskAtom>
 {
     private static Logger logger = LoggerFactory.getLogger(LazilyCompactedRow.class);
 
-    private final List<? extends ICountableColumnIterator> rows;
+    private final List<? extends OnDiskAtomIterator> rows;
     private final CompactionController controller;
     private final boolean shouldPurge;
     private ColumnFamily emptyColumnFamily;
     private Reducer reducer;
-    private final ColumnStats columnStats;
-    private long columnSerializedSize;
+    private ColumnStats columnStats;
     private boolean closed;
     private ColumnIndex.Builder indexBuilder;
-    private ColumnIndex columnsIndex;
     private final SecondaryIndexManager.Updater indexer;
+    private long maxDelTimestamp;
 
-    public LazilyCompactedRow(CompactionController controller, List<? extends ICountableColumnIterator> rows)
+    public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
         super(rows.get(0).getKey());
         this.rows = rows;
         this.controller = controller;
         indexer = controller.cfs.indexManager.updaterFor(key);
 
-        long maxDelTimestamp = Long.MIN_VALUE;
+        maxDelTimestamp = Long.MIN_VALUE;
         for (OnDiskAtomIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamily();
@@ -85,58 +81,45 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 emptyColumnFamily.delete(cf);
         }
         this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+    }
+
+    public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
+    {
+        assert !closed;
 
+        ColumnIndex columnsIndex;
         try
         {
-            indexAndWrite(null);
+            indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
+            columnsIndex = indexBuilder.build(this);
+            if (columnsIndex.columnsIndex.isEmpty())
+            {
+                boolean cfIrrelevant = shouldPurge
+                                       ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
+                                       : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
+                if (cfIrrelevant)
+                    return null;
+            }
         }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
-        // reach into the reducer used during iteration to get column count, size, max column timestamp
+        // reach into the reducer (created during iteration) to get column count, size, max column timestamp
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
-        columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, 
-                                      reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen, 
+        columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
+                                      reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
                                       reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
                                       reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
                                       reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones
         );
-        columnSerializedSize = reducer == null ? 0 : reducer.serializedSize;
         reducer = null;
-    }
-
-    private void indexAndWrite(DataOutput out) throws IOException
-    {
-        this.indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
-        this.columnsIndex = indexBuilder.build(this);
-    }
-
-    public long write(DataOutput out) throws IOException
-    {
-        assert !closed;
-
-        DataOutputBuffer clockOut = new DataOutputBuffer();
-        DeletionInfo.serializer().serializeForSSTable(emptyColumnFamily.deletionInfo(), clockOut);
-
-        long dataSize = clockOut.getLength() + columnSerializedSize;
-        if (logger.isDebugEnabled())
-            logger.debug(String.format("clock / column sizes are %s / %s", clockOut.getLength(), columnSerializedSize));
-        assert dataSize > 0;
-        out.writeLong(dataSize);
-        out.write(clockOut.getData(), 0, clockOut.getLength());
-        out.writeInt(indexBuilder.writtenAtomCount());
 
-        // We rebuild the column index uselessly, but we need to do that because range tombstone markers depend
-        // on indexing. If we're able to remove the two-phase compaction, we'll avoid that.
-        indexAndWrite(out);
-
-        long secondPassColumnSize = reducer == null ? 0 : reducer.serializedSize;
-        assert secondPassColumnSize == columnSerializedSize
-               : "originally calculated column size of " + columnSerializedSize + " but now it is " + secondPassColumnSize;
+        out.writeShort(SSTableWriter.END_OF_ROW);
 
         close();
-        return dataSize;
+
+        return RowIndexEntry.create(currentPosition, emptyColumnFamily.deletionInfo().getTopLevelDeletion(), columnsIndex);
     }
 
     public void update(MessageDigest digest)
@@ -150,7 +133,6 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         try
         {
             DeletionInfo.serializer().serializeForSSTable(emptyColumnFamily.deletionInfo(), out);
-            out.writeInt(columnStats.columnCount);
             digest.update(out.getData(), 0, out.getLength());
         }
         catch (IOException e)
@@ -158,30 +140,14 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             throw new AssertionError(e);
         }
 
+        // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator
+        indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
         Iterator<OnDiskAtom> iter = iterator();
         while (iter.hasNext())
-        {
             iter.next().updateDigest(digest);
-        }
         close();
     }
 
-    public boolean isEmpty()
-    {
-        boolean cfIrrelevant = shouldPurge
-                             ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
-                             : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
-        return cfIrrelevant && columnStats.columnCount == 0;
-    }
-
-    public int getEstimatedColumnCount()
-    {
-        int n = 0;
-        for (ICountableColumnIterator row : rows)
-            n += row.getColumnCount();
-        return n;
-    }
-
     public AbstractType<?> getComparator()
     {
         return emptyColumnFamily.getComparator();
@@ -189,8 +155,6 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
 
     public Iterator<OnDiskAtom> iterator()
     {
-        for (ICountableColumnIterator row : rows)
-            row.reset();
         reducer = new Reducer();
         Iterator<OnDiskAtom> iter = MergeIterator.get(rows, getComparator().onDiskAtomComparator, reducer);
         return Iterators.filter(iter, Predicates.notNull());
@@ -217,19 +181,6 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         closed = true;
     }
 
-    public DeletionInfo deletionInfo()
-    {
-        return emptyColumnFamily.deletionInfo();
-    }
-
-    /**
-     * @return the column index for this row.
-     */
-    public ColumnIndex index()
-    {
-        return columnsIndex;
-    }
-
     private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
     {
         // all columns reduced together will have the same name, so there will only be one column

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 718b1e3..c0bce30 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.utils.*;
 
@@ -164,7 +164,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                 return new CompactedRowContainer(rows.get(0).getKey(), executor.submit(new MergeTask(rawRows)));
             }
 
-            List<ICountableColumnIterator> iterators = new ArrayList<ICountableColumnIterator>(rows.size());
+            List<OnDiskAtomIterator> iterators = new ArrayList<OnDiskAtomIterator>(rows.size());
             for (RowContainer container : rows)
                 iterators.add(container.row == null ? container.wrapper : new DeserializedColumnIterator(container.row));
             return new CompactedRowContainer(new LazilyCompactedRow(controller, iterators));
@@ -203,7 +203,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
             }
         }
 
-        private class DeserializedColumnIterator implements ICountableColumnIterator
+        private class DeserializedColumnIterator implements OnDiskAtomIterator
         {
             private final Row row;
             private Iterator<Column> iter;
@@ -224,16 +224,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                 return row.key;
             }
 
-            public int getColumnCount()
-            {
-                return row.cf.getColumnCount();
-            }
-
-            public void reset()
-            {
-                iter = row.cf.iterator();
-            }
-
             public void close() throws IOException {}
 
             public boolean hasNext()
@@ -319,7 +309,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
     /**
      * a wrapper around SSTII that notifies the given condition when it is closed
      */
-    private static class NotifyingSSTableIdentityIterator implements ICountableColumnIterator
+    private static class NotifyingSSTableIdentityIterator implements OnDiskAtomIterator
     {
         private final SSTableIdentityIterator wrapped;
         private final Condition condition;
@@ -340,16 +330,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
             return wrapped.getKey();
         }
 
-        public int getColumnCount()
-        {
-            return wrapped.getColumnCount();
-        }
-
-        public void reset()
-        {
-            wrapped.reset();
-        }
-
         public void close() throws IOException
         {
             wrapped.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 1e7285a..1b5d4a9 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
@@ -42,7 +43,6 @@ import org.apache.cassandra.utils.MergeIterator;
 public class PrecompactedRow extends AbstractCompactedRow
 {
     private final ColumnFamily compactedCf;
-    private ColumnIndex columnIndex;
 
     /** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
     public PrecompactedRow(DecoratedKey key, ColumnFamily cf)
@@ -152,21 +152,12 @@ public class PrecompactedRow extends AbstractCompactedRow
         filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC);
     }
 
-    public long write(DataOutput out) throws IOException
+    public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
     {
-        assert compactedCf != null;
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        ColumnIndex.Builder builder = new ColumnIndex.Builder(compactedCf, key.key, buffer);
-        columnIndex = builder.build(compactedCf);
-
-        TypeSizes typeSizes = TypeSizes.NATIVE;
-        long delSize = DeletionTime.serializer.serializedSize(compactedCf.deletionInfo().getTopLevelDeletion(), typeSizes);
-        long dataSize = buffer.getLength() + delSize + typeSizes.sizeof(0);
-        out.writeLong(dataSize);
-        DeletionInfo.serializer().serializeForSSTable(compactedCf.deletionInfo(), out);
-        out.writeInt(builder.writtenAtomCount());
-        out.write(buffer.getData(), 0, buffer.getLength());
-        return dataSize;
+        if (compactedCf == null)
+            return null;
+
+        return SSTableWriter.rawAppend(compactedCf, currentPosition, key, out);
     }
 
     public void update(MessageDigest digest)
@@ -176,7 +167,6 @@ public class PrecompactedRow extends AbstractCompactedRow
         try
         {
             DeletionInfo.serializer().serializeForSSTable(compactedCf.deletionInfo(), buffer);
-            buffer.writeInt(compactedCf.getColumnCount());
             digest.update(buffer.getData(), 0, buffer.getLength());
         }
         catch (IOException e)
@@ -186,11 +176,6 @@ public class PrecompactedRow extends AbstractCompactedRow
         compactedCf.updateDigest(digest);
     }
 
-    public boolean isEmpty()
-    {
-        return compactedCf == null;
-    }
-
     public ColumnStats columnStats()
     {
         return compactedCf.getColumnStats();
@@ -207,18 +192,5 @@ public class PrecompactedRow extends AbstractCompactedRow
         return compactedCf;
     }
 
-    public DeletionInfo deletionInfo()
-    {
-        return compactedCf.deletionInfo();
-    }
-
-    /**
-     * @return the column index for this row.
-     */
-    public ColumnIndex index()
-    {
-        return columnIndex;
-    }
-
     public void close() { }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 421385a..5809741 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -111,7 +111,7 @@ public class Scrubber implements Closeable
             }
 
             // TODO errors when creating the writer may leave empty temp files.
-            writer = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, sstable);
+            writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
 
             AbstractCompactedRow prevRow = null;
 
@@ -168,25 +168,21 @@ public class Scrubber implements Closeable
                         throw new IOError(new IOException("Unable to read row key from data file"));
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size " + dataSize));
-                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+
+                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
                     AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                    if (compactedRow.isEmpty())
+                    if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
                     {
-                        emptyRows++;
+                        outOfOrderRows.add(compactedRow);
+                        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                        continue;
                     }
-                    else
-                    {
-                        if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
-                        {
-                            outOfOrderRows.add(compactedRow);
-                            outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
-                            continue;
-                        }
 
-                        writer.append(compactedRow);
-                        prevRow = compactedRow;
+                    if (writer.append(compactedRow) == null)
+                        emptyRows++;
+                    else
                         goodRows++;
-                    }
+                    prevRow = compactedRow;
                     if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
                         outputHandler.warn("Index file contained a different key or row size; using key from data file");
                 }
@@ -204,24 +200,19 @@ public class Scrubber implements Closeable
                         key = sstable.partitioner.decorateKey(currentIndexKey);
                         try
                         {
-                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
                             AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                            if (compactedRow.isEmpty())
+                            if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
                             {
-                                emptyRows++;
+                                outOfOrderRows.add(compactedRow);
+                                outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                                continue;
                             }
+                            if (writer.append(compactedRow) == null)
+                                emptyRows++;
                             else
-                            {
-                                if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
-                                {
-                                    outOfOrderRows.add(compactedRow);
-                                    outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
-                                    continue;
-                                }
-                                writer.append(compactedRow);
-                                prevRow = compactedRow;
                                 goodRows++;
-                            }
+                            prevRow = compactedRow;
                         }
                         catch (Throwable th2)
                         {
@@ -266,7 +257,7 @@ public class Scrubber implements Closeable
 
         if (!outOfOrderRows.isEmpty())
         {
-            SSTableWriter inOrderWriter = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, sstable);
+            SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
             for (AbstractCompactedRow row : outOfOrderRows)
                 inOrderWriter.append(row);
             newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index e2a7a18..fdc99b9 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -53,6 +53,7 @@ public class Descriptor
         //               into this new format)
         //             tracks max local deletiontime in sstable metadata
         //             records bloom_filter_fp_chance in metadata component
+        //             remove data size and column count from data file (CASSANDRA-4180)
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -63,6 +64,7 @@ public class Descriptor
         public final boolean tracksMaxLocalDeletionTime;
         public final boolean hasBloomFilterFPChance;
         public final boolean offHeapSummaries;
+        public final boolean hasRowSizeAndColumnCount;
 
         public Version(String version)
         {
@@ -72,6 +74,7 @@ public class Descriptor
             hasSuperColumns = version.compareTo("ja") < 0;
             hasBloomFilterFPChance = version.compareTo("ja") >= 0;
             offHeapSummaries = version.compareTo("ja") >= 0;
+            hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
deleted file mode 100644
index 4febea6..0000000
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * A SSTableScanner that only reads key in a given range (for validation compaction).
- */
-public class SSTableBoundedScanner extends SSTableScanner
-{
-    private final Iterator<Pair<Long, Long>> rangeIterator;
-    private Pair<Long, Long> currentRange;
-
-    SSTableBoundedScanner(SSTableReader sstable, Iterator<Pair<Long, Long>> rangeIterator, RateLimiter limiter)
-    {
-        super(sstable, limiter);
-        assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise
-        this.rangeIterator = rangeIterator;
-        currentRange = rangeIterator.next();
-        dfile.seek(currentRange.left);
-    }
-
-    /*
-     * This shouldn't be used with a bounded scanner as it could put the
-     * bounded scanner outside it's range.
-     */
-    @Override
-    public void seekTo(RowPosition seekKey)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-        if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : new BoundedKeyScanningIterator();
-        return iterator.hasNext();
-    }
-
-    @Override
-    public OnDiskAtomIterator next()
-    {
-        if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : new BoundedKeyScanningIterator();
-        return iterator.next();
-    }
-
-    protected class BoundedKeyScanningIterator extends KeyScanningIterator
-    {
-        @Override
-        public boolean hasNext()
-        {
-            if (!super.hasNext())
-                return false;
-
-            if (finishedAt < currentRange.right)
-                return true;
-
-            if (rangeIterator.hasNext())
-            {
-                currentRange = rangeIterator.next();
-                finishedAt = currentRange.left; // next() will seek for us
-                return true;
-            }
-            else
-            {
-                return false;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 8e636c5..d35e14d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -25,30 +25,25 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.BytesReadTracker;
 
-public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, ICountableColumnIterator
+public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, OnDiskAtomIterator
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableIdentityIterator.class);
 
     private final DecoratedKey key;
-    private final DataInput input;
-    private final long dataStart;
-    public final long dataSize;
+    private final DataInput in;
+    public final long dataSize; // we [still] require this so compaction can tell if it's safe to read the row into memory
     public final ColumnSerializer.Flag flag;
 
     private final ColumnFamily columnFamily;
     private final int columnCount;
-    private final long columnPosition;
 
     private final Iterator<OnDiskAtom> atomIterator;
     private final Descriptor.Version dataVersion;
 
-    private final BytesReadTracker inputWithTracker; // tracks bytes read
-
     // Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
     private final int expireBefore;
 
@@ -60,13 +55,12 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
      * @param sstable SSTable we are reading ffrom.
      * @param file Reading using this file.
      * @param key Key of this row.
-     * @param dataStart Data for this row starts at this pos.
      * @param dataSize length of row data
      * @throws IOException
      */
-    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize)
+    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize)
     {
-        this(sstable, file, key, dataStart, dataSize, false);
+        this(sstable, file, key, dataSize, false);
     }
 
     /**
@@ -74,39 +68,29 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
      * @param sstable SSTable we are reading ffrom.
      * @param file Reading using this file.
      * @param key Key of this row.
-     * @param dataStart Data for this row starts at this pos.
      * @param dataSize length of row data
      * @param checkData if true, do its best to deserialize and check the coherence of row data
      */
-    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize, boolean checkData)
+    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
     {
-        this(sstable.metadata, file, file.getPath(), key, dataStart, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
-    }
-
-    // Must only be used against current file format
-    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, String filename, DecoratedKey key, long dataStart, long dataSize, ColumnSerializer.Flag flag)
-    {
-        this(metadata, file, filename, key, dataStart, dataSize, false, null, flag);
+        this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
     }
 
     // sstable may be null *if* checkData is false
     // If it is null, we assume the data is in the current file format
     private SSTableIdentityIterator(CFMetaData metadata,
-                                    DataInput input,
+                                    DataInput in,
                                     String filename,
                                     DecoratedKey key,
-                                    long dataStart,
                                     long dataSize,
                                     boolean checkData,
                                     SSTableReader sstable,
                                     ColumnSerializer.Flag flag)
     {
         assert !checkData || (sstable != null);
-        this.input = input;
+        this.in = in;
         this.filename = filename;
-        this.inputWithTracker = new BytesReadTracker(input);
         this.key = key;
-        this.dataStart = dataStart;
         this.dataSize = dataSize;
         this.expireBefore = (int)(System.currentTimeMillis() / 1000);
         this.flag = flag;
@@ -115,21 +99,10 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
 
         try
         {
-            if (input instanceof RandomAccessReader)
-            {
-                RandomAccessReader file = (RandomAccessReader) input;
-                file.seek(this.dataStart);
-                if (dataStart + dataSize > file.length())
-                    throw new IOException(String.format("dataSize of %s starting at %s would be larger than file %s length %s",
-                                          dataSize, dataStart, file.getPath(), file.length()));
-            }
-
             columnFamily = EmptyColumns.factory.create(metadata);
-            columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(inputWithTracker, dataVersion));
-
-            columnCount = inputWithTracker.readInt();
-            atomIterator = columnFamily.metadata().getOnDiskIterator(inputWithTracker, columnCount, dataVersion);
-            columnPosition = dataStart + inputWithTracker.getBytesRead();
+            columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(in, dataVersion));
+            columnCount = dataVersion.hasRowSizeAndColumnCount ? in.readInt() : Integer.MAX_VALUE;
+            atomIterator = columnFamily.metadata().getOnDiskIterator(in, columnCount, dataVersion);
         }
         catch (IOException e)
         {
@@ -151,7 +124,18 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
 
     public boolean hasNext()
     {
-        return inputWithTracker.getBytesRead() < dataSize;
+        try
+        {
+            return atomIterator.hasNext();
+        }
+        catch (IOError e)
+        {
+            // catch here b/c atomIterator is an AbstractIterator; hasNext reads the value
+            if (e.getCause() instanceof IOException)
+                throw new CorruptSSTableException((IOException)e.getCause(), filename);
+            else
+                throw e;
+        }
     }
 
     public OnDiskAtom next()
@@ -163,13 +147,6 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
                 atom.validateFields(columnFamily.metadata());
             return atom;
         }
-        catch (IOError e)
-        {
-            if (e.getCause() instanceof IOException)
-                throw new CorruptSSTableException((IOException)e.getCause(), filename);
-            else
-                throw e;
-        }
         catch (MarshalException me)
         {
             throw new CorruptSSTableException(me, filename);
@@ -189,9 +166,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
     public String getPath()
     {
         // if input is from file, then return that path, otherwise it's from streaming
-        if (input instanceof RandomAccessReader)
+        if (in instanceof RandomAccessReader)
         {
-            RandomAccessReader file = (RandomAccessReader) input;
+            RandomAccessReader file = (RandomAccessReader) in;
             return file.getPath();
         }
         else
@@ -202,10 +179,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
 
     public ColumnFamily getColumnFamilyWithColumns(ColumnFamily.Factory containerFactory)
     {
-        assert inputWithTracker.getBytesRead() == headerSize();
         ColumnFamily cf = columnFamily.cloneMeShallow(containerFactory, false);
         // since we already read column count, just pass that value and continue deserialization
-        columnFamily.serializer.deserializeColumnsFromSSTable(inputWithTracker, cf, columnCount, flag, expireBefore, dataVersion);
+        columnFamily.serializer.deserializeColumnsFromSSTable(in, cf, columnCount, flag, expireBefore, dataVersion);
         if (validateColumns)
         {
             try
@@ -220,28 +196,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         return cf;
     }
 
-    private long headerSize()
-    {
-        return columnPosition - dataStart;
-    }
-
     public int compareTo(SSTableIdentityIterator o)
     {
         return key.compareTo(o.key);
     }
 
-    public void reset()
-    {
-        if (!(input instanceof RandomAccessReader))
-            throw new UnsupportedOperationException();
-
-        RandomAccessReader file = (RandomAccessReader) input;
-        file.seek(columnPosition);
-        inputWithTracker.reset(headerSize());
-    }
-
-    public int getColumnCount()
-    {
-        return columnCount;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a09f65b..baba472 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1000,10 +1000,15 @@ public class SSTableReader extends SSTable
      */
     public SSTableScanner getScanner(QueryFilter filter)
     {
-        return new SSTableScanner(this, filter);
+        return new SSTableScanner(this, filter, null);
     }
 
-   /**
+    public SSTableScanner getScanner(QueryFilter filter, RowPosition startWith)
+    {
+        return new SSTableScanner(this, filter, startWith, null);
+    }
+
+    /**
     * I/O SSTableScanner
     * @return A Scanner for seeking over the rows of the SSTable.
     */
@@ -1014,7 +1019,7 @@ public class SSTableReader extends SSTable
 
    public SSTableScanner getScanner(RateLimiter limiter)
    {
-       return new SSTableScanner(this, limiter);
+       return new SSTableScanner(this, null, limiter);
    }
 
    /**
@@ -1030,7 +1035,7 @@ public class SSTableReader extends SSTable
 
         Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
         if (rangeIterator.hasNext())
-            return new SSTableBoundedScanner(this, rangeIterator, limiter);
+            return new SSTableScanner(this, null, range, limiter);
         else
             return new EmptyCompactionScanner(getFilename());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 38b9e65..fb52a02 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -18,11 +18,12 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Iterator;
 
 import com.google.common.util.concurrent.RateLimiter;
 
+import com.google.common.collect.AbstractIterator;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.RowPosition;
@@ -31,6 +32,8 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -40,77 +43,83 @@ public class SSTableScanner implements ICompactionScanner
     protected final RandomAccessReader dfile;
     protected final RandomAccessReader ifile;
     public final SSTableReader sstable;
-    private OnDiskAtomIterator row;
-    protected boolean exhausted = false;
     protected Iterator<OnDiskAtomIterator> iterator;
     private final QueryFilter filter;
+    private long stopAt;
 
     /**
-     * @param sstable SSTable to scan.
-     * @param limiter
+     * @param sstable SSTable to scan; must not be null
+     * @param filter filter to use when scanning the columns; may be null
+     * @param limiter background i/o RateLimiter; may be null
      */
-    SSTableScanner(SSTableReader sstable, RateLimiter limiter)
+    SSTableScanner(SSTableReader sstable, QueryFilter filter, RateLimiter limiter)
     {
-        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
-        this.ifile = sstable.openIndexReader();
-        this.sstable = sstable;
-        this.filter = null;
-    }
+        assert sstable != null;
 
-    /**
-     * @param sstable SSTable to scan.
-     * @param filter filter to use when scanning the columns
-     */
-    SSTableScanner(SSTableReader sstable, QueryFilter filter)
-    {
-        this.dfile = sstable.openDataReader();
+        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
         this.sstable = sstable;
         this.filter = filter;
+        stopAt = dfile.length();
     }
 
-    public void close() throws IOException
+    public SSTableScanner(SSTableReader sstable, QueryFilter filter, RowPosition startWith, RateLimiter limiter)
     {
-        FileUtils.close(dfile, ifile);
-    }
+        this(sstable, filter, limiter);
+
+        long indexPosition = sstable.getIndexScanPosition(startWith);
+        // -1 means the key is before everything in the sstable. So just start from the beginning.
+        if (indexPosition == -1)
+            indexPosition = 0;
+        ifile.seek(indexPosition);
 
-    public void seekTo(RowPosition seekKey)
-    {
         try
         {
-            long indexPosition = sstable.getIndexScanPosition(seekKey);
-            // -1 means the key is before everything in the sstable. So just start from the beginning.
-            if (indexPosition == -1)
-                indexPosition = 0;
-
-            ifile.seek(indexPosition);
-
             while (!ifile.isEOF())
             {
-                long startPosition = ifile.getFilePointer();
+                indexPosition = ifile.getFilePointer();
                 DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                int comparison = indexDecoratedKey.compareTo(seekKey);
+                int comparison = indexDecoratedKey.compareTo(startWith);
                 if (comparison >= 0)
                 {
                     // Found, just read the dataPosition and seek into index and data files
                     long dataPosition = ifile.readLong();
-                    ifile.seek(startPosition);
+                    ifile.seek(indexPosition);
                     dfile.seek(dataPosition);
-                    row = null;
-                    return;
+                    break;
                 }
                 else
                 {
                     RowIndexEntry.serializer.skip(ifile);
                 }
             }
-            exhausted = true;
         }
         catch (IOException e)
         {
             sstable.markSuspect();
-            throw new CorruptSSTableException(e, ifile.getPath());
+            throw new CorruptSSTableException(e, sstable.getFilename());
         }
+
+    }
+
+    public SSTableScanner(SSTableReader sstable, QueryFilter filter, Range<Token> range, RateLimiter limiter)
+    {
+        this(sstable, filter, range.toRowBounds().left, limiter);
+
+        if (range.isWrapAround())
+        {
+            stopAt = dfile.length();
+        }
+        else
+        {
+            RowIndexEntry position = sstable.getPosition(range.toRowBounds().right, SSTableReader.Operator.GT);
+            stopAt = position == null ? dfile.length() : position.position;
+        }
+    }
+
+    public void close() throws IOException
+    {
+        FileUtils.close(dfile, ifile);
     }
 
     public long getLengthInBytes()
@@ -131,14 +140,14 @@ public class SSTableScanner implements ICompactionScanner
     public boolean hasNext()
     {
         if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : createIterator();
+            iterator = createIterator();
         return iterator.hasNext();
     }
 
     public OnDiskAtomIterator next()
     {
         if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : createIterator();
+            iterator = createIterator();
         return iterator.next();
     }
 
@@ -149,76 +158,24 @@ public class SSTableScanner implements ICompactionScanner
 
     private Iterator<OnDiskAtomIterator> createIterator()
     {
-        return filter == null ? new KeyScanningIterator() : new FilteredKeyScanningIterator();
-    }
-
-    protected class KeyScanningIterator implements Iterator<OnDiskAtomIterator>
-    {
-        protected long finishedAt;
-
-        public boolean hasNext()
-        {
-            if (row == null)
-                return !dfile.isEOF();
-            return finishedAt < dfile.length();
-        }
-
-        public OnDiskAtomIterator next()
-        {
-            try
-            {
-                if (row != null)
-                    dfile.seek(finishedAt);
-                assert !dfile.isEOF();
-
-                // Read data header
-                DecoratedKey key = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dfile));
-                long dataSize = dfile.readLong();
-                long dataStart = dfile.getFilePointer();
-                finishedAt = dataStart + dataSize;
-
-                row = new SSTableIdentityIterator(sstable, dfile, key, dataStart, dataSize);
-                return row;
-            }
-            catch (IOException e)
-            {
-                sstable.markSuspect();
-                throw new CorruptSSTableException(e, dfile.getPath());
-            }
-        }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public String toString()
-        {
-            return getClass().getSimpleName() + "(" + "finishedAt:" + finishedAt + ")";
-        }
+        return new KeyScanningIterator();
     }
 
-    protected class FilteredKeyScanningIterator implements Iterator<OnDiskAtomIterator>
+    protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
     {
-        protected DecoratedKey nextKey;
-        protected RowIndexEntry nextEntry;
-
-        public boolean hasNext()
-        {
-            if (row == null)
-                return !ifile.isEOF();
-            return nextKey != null;
-        }
+        private DecoratedKey nextKey;
+        private RowIndexEntry nextEntry;
+        private DecoratedKey currentKey;
+        private RowIndexEntry currentEntry;
 
-        public OnDiskAtomIterator next()
+        protected OnDiskAtomIterator computeNext()
         {
             try
             {
-                final DecoratedKey currentKey;
-                final RowIndexEntry currentEntry;
+                if (ifile.isEOF() && nextKey == null)
+                    return endOfData();
 
-                if (row == null)
+                if (currentKey == null)
                 {
                     currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
                     currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
@@ -229,6 +186,10 @@ public class SSTableScanner implements ICompactionScanner
                     currentEntry = nextEntry;
                 }
 
+                assert currentEntry.position <= stopAt;
+                if (currentEntry.position == stopAt)
+                    return endOfData();
+
                 if (ifile.isEOF())
                 {
                     nextKey = null;
@@ -241,7 +202,18 @@ public class SSTableScanner implements ICompactionScanner
                 }
 
                 assert !dfile.isEOF();
-                return row = new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
+
+                if (filter == null)
+                {
+                    dfile.seek(currentEntry.position);
+                    ByteBufferUtil.readWithShortLength(dfile); // key
+                    if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+                        dfile.readLong();
+                    long dataSize = (nextEntry == null ? dfile.length() : nextEntry.position) - dfile.getFilePointer();
+                    return new SSTableIdentityIterator(sstable, dfile, currentKey, dataSize);
+                }
+
+                return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
                 {
                     public OnDiskAtomIterator create()
                     {
@@ -252,14 +224,9 @@ public class SSTableScanner implements ICompactionScanner
             catch (IOException e)
             {
                 sstable.markSuspect();
-                throw new CorruptSSTableException(e, ifile.getPath());
+                throw new CorruptSSTableException(e, sstable.getFilename());
             }
         }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
     }
 
     @Override
@@ -269,7 +236,6 @@ public class SSTableScanner implements ICompactionScanner
                "dfile=" + dfile +
                " ifile=" + ifile +
                " sstable=" + sstable +
-               " exhausted=" + exhausted +
                ")";
     }
 }