You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/05/07 21:22:20 UTC

[1/2] Single-pass compaction patch by jbellis; reviewed by marcuse for CASSANDRA-4180

Updated Branches:
  refs/heads/trunk 3ca160ea7 -> a9bd531bc


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 8defb10..43ea5a8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -40,6 +40,9 @@ public class SSTableWriter extends SSTable
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
 
+    // not very random, but the only value that can't be mistaken for a legal column-name length
+    public static final int END_OF_ROW = 0x0000;
+
     private IndexWriter iwriter;
     private SegmentedFile.Builder dbuilder;
     private final SequentialWriter dataFile;
@@ -135,7 +138,7 @@ public class SSTableWriter extends SSTable
         return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
     }
 
-    private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionInfo delInfo, ColumnIndex index)
+    private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
     {
         lastWrittenKey = decoratedKey;
         last = lastWrittenKey;
@@ -144,30 +147,31 @@ public class SSTableWriter extends SSTable
 
         if (logger.isTraceEnabled())
             logger.trace("wrote " + decoratedKey + " at " + dataPosition);
-        // range tombstones are part of the Atoms we write as the row contents, so RIE only gets row-level tombstones
-        RowIndexEntry entry = RowIndexEntry.create(dataPosition, delInfo.getTopLevelDeletion(), index);
-        iwriter.append(decoratedKey, entry);
+        iwriter.append(decoratedKey, index);
         dbuilder.addPotentialBoundary(dataPosition);
-        return entry;
     }
 
+    /**
+     * @param row
+     * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+     */
     public RowIndexEntry append(AbstractCompactedRow row)
     {
         long currentPosition = beforeAppend(row.key);
+        RowIndexEntry entry;
         try
         {
-            ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream);
-            long dataStart = dataFile.getFilePointer();
-            long dataSize = row.write(dataFile.stream);
-            assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
-                   : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
+            entry = row.write(currentPosition, dataFile.stream);
+            if (entry == null)
+                return null;
         }
         catch (IOException e)
         {
             throw new FSWriteError(e, dataFile.getPath());
         }
         sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
-        return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index());
+        afterAppend(row.key, currentPosition, entry);
+        return entry;
     }
 
     public void append(DecoratedKey decoratedKey, ColumnFamily cf)
@@ -175,25 +179,8 @@ public class SSTableWriter extends SSTable
         long startPosition = beforeAppend(decoratedKey);
         try
         {
-            ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
-
-            // Since the columnIndex may insert RangeTombstone marker, computing
-            // the size of the data is tricky.
-            DataOutputBuffer buffer = new DataOutputBuffer();
-
-            // build column index && write columns
-            ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, decoratedKey.key, buffer);
-            ColumnIndex index = builder.build(cf);
-
-            TypeSizes typeSizes = TypeSizes.NATIVE;
-            long delSize = DeletionTime.serializer.serializedSize(cf.deletionInfo().getTopLevelDeletion(), typeSizes);
-            dataFile.stream.writeLong(buffer.getLength() + delSize + typeSizes.sizeof(0));
-
-            // Write deletion infos + column count
-            DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dataFile.stream);
-            dataFile.stream.writeInt(builder.writtenAtomCount());
-            dataFile.stream.write(buffer.getData(), 0, buffer.getLength());
-            afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
+            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
+            afterAppend(decoratedKey, startPosition, entry);
         }
         catch (IOException e)
         {
@@ -202,38 +189,26 @@ public class SSTableWriter extends SSTable
         sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
     }
 
+    public static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutput out) throws IOException
+    {
+        assert cf.getColumnCount() > 0 || cf.isMarkedForDelete();
+
+        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.key, out);
+        ColumnIndex index = builder.build(cf);
+
+        out.writeShort(END_OF_ROW);
+        return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
+    }
+
     /**
      * @throws IOException if a read from the DataInput fails
      * @throws FSWriteError if a write to the dataFile fails
      */
-    public long appendFromStream(DecoratedKey key, CFMetaData metadata, long dataSize, DataInput in) throws IOException
+    public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in) throws IOException
     {
         long currentPosition = beforeAppend(key);
-        long dataStart;
-        try
-        {
-            ByteBufferUtil.writeWithShortLength(key.key, dataFile.stream);
-            dataStart = dataFile.getFilePointer();
-            // write row size
-            dataFile.stream.writeLong(dataSize);
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, dataFile.getPath());
-        }
 
         DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(in, descriptor.version);
-        int columnCount = in.readInt();
-
-        try
-        {
-            DeletionInfo.serializer().serializeForSSTable(deletionInfo, dataFile.stream);
-            dataFile.stream.writeInt(columnCount);
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, dataFile.getPath());
-        }
 
         // deserialize each column to obtain maxTimestamp and immediately serialize it.
         long minTimestamp = Long.MAX_VALUE;
@@ -245,42 +220,45 @@ public class SSTableWriter extends SSTable
 
         ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, dataFile.stream, true);
         OnDiskAtom.Serializer atomSerializer = Column.onDiskSerializer();
-        for (int i = 0; i < columnCount; i++)
+        try
         {
-            // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
-            // data size received, so we must reserialize the exact same data
-            OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT);
-            if (atom instanceof CounterColumn)
-                atom = ((CounterColumn) atom).markDeltaToBeCleared();
-
-            int deletionTime = atom.getLocalDeletionTime();
-            if (deletionTime < Integer.MAX_VALUE)
+            while (true)
             {
-                tombstones.update(deletionTime);
-            }
-            minTimestamp = Math.min(minTimestamp, atom.minTimestamp());
-            maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
-            maxLocalDeletionTime = Math.max(maxLocalDeletionTime, atom.getLocalDeletionTime());
+                // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
+                // data size received, so we must reserialize the exact same data
+                OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT);
+                if (atom == null)
+                    break;
+                if (atom instanceof CounterColumn)
+                    atom = ((CounterColumn) atom).markDeltaToBeCleared();
+
+                int deletionTime = atom.getLocalDeletionTime();
+                if (deletionTime < Integer.MAX_VALUE)
+                {
+                    tombstones.update(deletionTime);
+                }
+                minTimestamp = Math.min(minTimestamp, atom.minTimestamp());
+                maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
+                maxLocalDeletionTime = Math.max(maxLocalDeletionTime, atom.getLocalDeletionTime());
 
-            try
-            {
                 columnIndexer.add(atom); // This write the atom on disk too
             }
-            catch (IOException e)
-            {
-                throw new FSWriteError(e, dataFile.getPath());
-            }
+
+            columnIndexer.finish();
+            dataFile.stream.writeShort(END_OF_ROW);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, dataFile.getPath());
         }
 
-        assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
-                : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
         sstableMetadataCollector.updateMinTimestamp(minTimestamp);
         sstableMetadataCollector.updateMaxTimestamp(maxTimestamp);
         sstableMetadataCollector.updateMaxLocalDeletionTime(maxLocalDeletionTime);
         sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
-        sstableMetadataCollector.addColumnCount(columnCount);
+        sstableMetadataCollector.addColumnCount(columnIndexer.writtenAtomCount());
         sstableMetadataCollector.mergeTombstoneHistogram(tombstones);
-        afterAppend(key, currentPosition, deletionInfo, columnIndexer.build());
+        afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, deletionInfo.getTopLevelDeletion(), columnIndexer.build()));
         return currentPosition;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index e1aaa56..92f5c7f 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -27,15 +27,14 @@ import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.compress.lzf.LZFInputStream;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ColumnSerializer;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.PrecompactedRow;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.service.StorageService;
@@ -43,7 +42,6 @@ import org.apache.cassandra.streaming.compress.CompressedInputStream;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
-import com.ning.compress.lzf.LZFInputStream;
 
 public class IncomingStreamReader
 {
@@ -157,28 +155,11 @@ public class IncomingStreamReader
                 while (bytesRead < length)
                 {
                     in.reset(0);
+
                     key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
-                    long dataSize = in.readLong();
-
-                    if (cfs.containsCachedRow(key) && remoteFile.type == OperationType.AES && dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit())
-                    {
-                        // need to update row cache
-                        // Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below
-                        SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, localFile.getFilename(), key, 0, dataSize, ColumnSerializer.Flag.FROM_REMOTE);
-                        PrecompactedRow row = new PrecompactedRow(controller, Collections.singletonList(iter));
-                        // We don't expire anything so the row shouldn't be empty
-                        assert !row.isEmpty();
-                        writer.append(row);
-
-                        // update cache
-                        ColumnFamily cf = row.getFullColumnFamily();
-                        cfs.maybeUpdateRowCache(key, cf);
-                    }
-                    else
-                    {
-                        writer.appendFromStream(key, cfs.metadata, dataSize, in);
-                        cfs.invalidateCachedRow(key);
-                    }
+                    writer.appendFromStream(key, cfs.metadata, in);
+
+                    cfs.invalidateCachedRow(key);
 
                     bytesRead += in.getBytesRead();
                     // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 9e8bb3e..e3b02b1 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -103,15 +104,15 @@ public class SSTableExport
      * </ul>
      *
      * @param out The output steam to write data
-     * @param cf  to which the metadata belongs
+     * @param deletionInfo
      */
-    private static void writeMeta(PrintStream out, ColumnFamily cf)
+    private static void writeMeta(PrintStream out, DeletionInfo deletionInfo)
     {
-        if (!cf.deletionInfo().equals(DeletionInfo.LIVE))
+        if (!deletionInfo.equals(DeletionInfo.LIVE))
         {
             // begin meta
             writeKey(out, "metadata");
-            writeDeletionInfo(out, cf.deletionInfo().getTopLevelDeletion());
+            writeDeletionInfo(out, deletionInfo.getTopLevelDeletion());
             out.print(",");
         }
     }
@@ -130,22 +131,22 @@ public class SSTableExport
      *
      * @param atoms      column iterator
      * @param out        output stream
-     * @param comparator columns comparator
      * @param cfMetaData Column Family metadata (to get validator)
      */
-    private static void serializeAtoms(Iterator<OnDiskAtom> atoms, PrintStream out, AbstractType<?> comparator, CFMetaData cfMetaData)
+    private static void serializeAtoms(Iterator<OnDiskAtom> atoms, PrintStream out, CFMetaData cfMetaData)
     {
         while (atoms.hasNext())
         {
-            writeJSON(out, serializeAtom(atoms.next(), comparator, cfMetaData));
+            writeJSON(out, serializeAtom(atoms.next(), cfMetaData));
 
             if (atoms.hasNext())
                 out.print(", ");
         }
     }
 
-    private static List<Object> serializeAtom(OnDiskAtom atom, AbstractType<?> comparator, CFMetaData cfMetaData)
+    private static List<Object> serializeAtom(OnDiskAtom atom, CFMetaData cfMetaData)
     {
+        AbstractType<?> comparator = cfMetaData.comparator;
         if (atom instanceof Column)
         {
             return serializeColumn((Column) atom, comparator, cfMetaData);
@@ -219,21 +220,22 @@ public class SSTableExport
      */
     private static void serializeRow(SSTableIdentityIterator row, DecoratedKey key, PrintStream out)
     {
-        ColumnFamily columnFamily = row.getColumnFamily();
-        CFMetaData cfMetaData = columnFamily.metadata();
-        AbstractType<?> comparator = columnFamily.getComparator();
+        serializeRow(row.getColumnFamily().deletionInfo(), row, row.getColumnFamily().metadata(), key, out);
+    }
 
+    private static void serializeRow(DeletionInfo deletionInfo, Iterator<OnDiskAtom> atoms, CFMetaData metadata, DecoratedKey key, PrintStream out)
+    {
         out.print("{");
         writeKey(out, "key");
         writeJSON(out, bytesToHex(key.key));
         out.print(",");
 
-        writeMeta(out, columnFamily);
+        writeMeta(out, deletionInfo);
 
         writeKey(out, "columns");
         out.print("[");
 
-        serializeAtoms(row, out, comparator, cfMetaData);
+        serializeAtoms(atoms, out, metadata);
 
         out.print("]");
         out.print("}");
@@ -277,10 +279,10 @@ public class SSTableExport
      */
     public static void export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException
     {
-        SSTableReader reader = SSTableReader.open(desc);
-        SSTableScanner scanner = reader.getScanner();
+        SSTableReader sstable = SSTableReader.open(desc);
+        RandomAccessReader dfile = sstable.openDataReader();
 
-        IPartitioner<?> partitioner = reader.partitioner;
+        IPartitioner<?> partitioner = sstable.partitioner;
 
         if (excludes != null)
             toExport.removeAll(Arrays.asList(excludes));
@@ -301,16 +303,20 @@ public class SSTableExport
 
             lastKey = decoratedKey;
 
-            scanner.seekTo(decoratedKey);
-
-            if (!scanner.hasNext())
+            RowIndexEntry entry = sstable.getPosition(decoratedKey, SSTableReader.Operator.EQ);
+            if (entry == null)
                 continue;
 
-            SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
-            if (!row.getKey().equals(decoratedKey))
-                continue;
+            dfile.seek(entry.position);
+            ByteBufferUtil.readWithShortLength(dfile); // row key
+            if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+                dfile.readLong(); // row size
+            DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(dfile, sstable.descriptor.version);
+            int columnCount = sstable.descriptor.version.hasRowSizeAndColumnCount ? dfile.readInt() : Integer.MAX_VALUE;
+
+            Iterator<OnDiskAtom> atomIterator = sstable.metadata.getOnDiskIterator(dfile, columnCount, sstable.descriptor.version);
 
-            serializeRow(row, decoratedKey, outs);
+            serializeRow(deletionInfo, atomIterator, sstable.metadata, decoratedKey, outs);
 
             if (i != 0)
                 outs.println(",");
@@ -321,8 +327,6 @@ public class SSTableExport
 
         outs.println("\n]");
         outs.flush();
-
-        scanner.close();
     }
 
     // This is necessary to accommodate the test suite since you cannot open a Reader more

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 338abb9..015ece0 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -52,12 +52,12 @@ public class SchemaLoader
     private static Logger logger = LoggerFactory.getLogger(SchemaLoader.class);
 
     @BeforeClass
-    public static void loadSchema() throws IOException
+    public static void loadSchema() throws IOException, ConfigurationException
     {
         loadSchema(false);
     }
 
-    public static void loadSchema(boolean withOldCfIds) throws IOException
+    public static void loadSchema(boolean withOldCfIds) throws IOException, ConfigurationException
     {
         // Cleanup first
         cleanupAndLeaveDirs();
@@ -72,18 +72,13 @@ public class SchemaLoader
             }
         });
 
-
-        // Migrations aren't happy if gossiper is not started
+        // Migrations aren't happy if gossiper is not started.  Even if we don't use migrations though,
+        // some tests now expect us to start gossip for them.
         startGossiper();
-        try
-        {
-            for (KSMetaData ksm : schemaDefinition(withOldCfIds))
-                MigrationManager.announceNewKeyspace(ksm);
-        }
-        catch (ConfigurationException e)
-        {
-            throw new RuntimeException(e);
-        }
+        // if you're messing with low-level sstable stuff, it can be useful to inject the schema directly
+        // Schema.instance.load(schemaDefinition(withOldCfIds));
+        for (KSMetaData ksm : schemaDefinition(withOldCfIds))
+            MigrationManager.announceNewKeyspace(ksm);
     }
 
     public static void startGossiper()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index c1e35b4..a14fd99 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -36,7 +36,6 @@ import java.util.concurrent.Future;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.AbstractCompactionTask;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -285,21 +284,4 @@ public class Util
 
         assert thrown : exception.getName() + " not received";
     }
-
-    public static ByteBuffer serializeForSSTable(ColumnFamily cf)
-    {
-        try
-        {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            DataOutputStream out = new DataOutputStream(baos);
-            DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), out);
-            out.writeInt(cf.getColumnCount());
-            new ColumnIndex.Builder(cf, ByteBufferUtil.EMPTY_BYTE_BUFFER, out).build(cf);
-            return ByteBuffer.wrap(baos.toByteArray());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
deleted file mode 100644
index d1635d8..0000000
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-package org.apache.cassandra.db;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CLibrary;
-
-import static org.apache.cassandra.Util.column;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@RunWith(OrderedJUnit4ClassRunner.class)
-public class ScrubTest extends SchemaLoader
-{
-    public String TABLE = "Keyspace1";
-    public String CF = "Standard1";
-    public String CF3 = "Standard2";
-
-    public String copySSTables(String cf) throws IOException
-    {
-        String root = System.getProperty("corrupt-sstable-root");
-        assert root != null;
-        File rootDir = new File(root);
-        assert rootDir.isDirectory();
-
-        File destDir = Directories.create(TABLE, cf).getDirectoryForNewSSTables(1);
-
-        String corruptSSTableName = null;
-
-        FileUtils.createDirectory(destDir);
-        for (File srcFile : rootDir.listFiles())
-        {
-            if (srcFile.getName().equals(".svn"))
-                continue;
-            if (!srcFile.getName().contains(cf))
-                continue;
-            File destFile = new File(destDir, srcFile.getName());
-            CLibrary.createHardLink(srcFile, destFile);
-
-            assert destFile.exists() : destFile.getAbsoluteFile();
-
-            if(destFile.getName().endsWith("Data.db"))
-                corruptSSTableName = destFile.getCanonicalPath();
-        }
-
-        assert corruptSSTableName != null;
-        return corruptSSTableName;
-    }
-
-    @Test
-    public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-        Table table = Table.open(TABLE);
-        ColumnFamilyStore cfs = table.getColumnFamilyStore(CF);
-
-        List<Row> rows;
-
-        // insert data and verify we get it back w/ range query
-        fillCF(cfs, 1);
-        rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
-        assertEquals(1, rows.size());
-
-        CompactionManager.instance.performScrub(cfs);
-
-        // check data is still there
-        rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
-        assertEquals(1, rows.size());
-    }
-
-    @Test
-    public void testScrubDeletedRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-        Table table = Table.open(TABLE);
-        ColumnFamilyStore cfs = table.getColumnFamilyStore(CF3);
-
-        ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(TABLE, CF3);
-        cf.delete(new DeletionInfo(0, 1)); // expired tombstone
-        RowMutation rm = new RowMutation(TABLE, ByteBufferUtil.bytes(1), cf);
-        rm.applyUnsafe();
-        cfs.forceBlockingFlush();
-
-        CompactionManager.instance.performScrub(cfs);
-        assert cfs.getSSTables().isEmpty();
-    }
-
-    @Test
-    public void testScrubMultiRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-        Table table = Table.open(TABLE);
-        ColumnFamilyStore cfs = table.getColumnFamilyStore(CF);
-
-        List<Row> rows;
-
-        // insert data and verify we get it back w/ range query
-        fillCF(cfs, 10);
-        rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
-        assertEquals(10, rows.size());
-
-        CompactionManager.instance.performScrub(cfs);
-
-        // check data is still there
-        rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
-        assertEquals(10, rows.size());
-    }
-
-    @Test
-    public void testScubOutOfOrder() throws Exception
-    {
-        CompactionManager.instance.disableAutoCompaction();
-        Table table = Table.open(TABLE);
-        String columnFamily = "Standard3";
-        ColumnFamilyStore cfs = table.getColumnFamilyStore(columnFamily);
-
-        /*
-         * Code used to generate an outOfOrder sstable. The test for out-of-order key in SSTableWriter must also be commented out.
-         * The test also assumes an ordered partitioner.
-         *
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
-        cf.addColumn(new Column(ByteBufferUtil.bytes("someName"), ByteBufferUtil.bytes("someValue"), 0L));
-
-        SSTableWriter writer = new SSTableWriter(cfs.getTempSSTablePath(new File(System.getProperty("corrupt-sstable-root"))),
-                                                 cfs.metadata.getIndexInterval(),
-                                                 cfs.metadata,
-                                                 cfs.partitioner,
-                                                 SSTableMetadata.createCollector());
-        writer.append(Util.dk("a"), cf);
-        writer.append(Util.dk("b"), cf);
-        writer.append(Util.dk("z"), cf);
-        writer.append(Util.dk("c"), cf);
-        writer.append(Util.dk("y"), cf);
-        writer.append(Util.dk("d"), cf);
-        writer.closeAndOpenReader();
-        */
-
-        copySSTables(columnFamily);
-        cfs.loadNewSSTables();
-        assert cfs.getSSTables().size() > 0;
-
-        List<Row> rows;
-        rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
-        assert !isRowOrdered(rows) : "'corrupt' test file actually was not";
-
-        CompactionManager.instance.performScrub(cfs);
-        rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
-        assert isRowOrdered(rows) : "Scrub failed: " + rows;
-        assert rows.size() == 6 : "Got " + rows.size();
-    }
-
-    private static boolean isRowOrdered(List<Row> rows)
-    {
-        DecoratedKey prev = null;
-        for (Row row : rows)
-        {
-            if (prev != null && prev.compareTo(row.key) > 0)
-                return false;
-            prev = row.key;
-        }
-        return true;
-    }
-
-    protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException
-    {
-        for (int i = 0; i < rowsPerSSTable; i++)
-        {
-            String key = String.valueOf(i);
-            // create a row and update the birthdate value, test that the index query fetches the new version
-            ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(TABLE, CF);
-            cf.addColumn(column("c1", "1", 1L));
-            cf.addColumn(column("c2", "2", 1L));
-            RowMutation rm = new RowMutation(TABLE, ByteBufferUtil.bytes(key), cf);
-            rm.applyUnsafe();
-        }
-
-        cfs.forceBlockingFlush();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index f87a0f8..267353f 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.net.CallbackInfo;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -48,7 +49,7 @@ public class SerializationsTest extends AbstractSerializationsTester
     Statics statics = new Statics();
 
     @BeforeClass
-    public static void loadSchema() throws IOException
+    public static void loadSchema() throws IOException, ConfigurationException
     {
         loadSchema(true);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index f066b9b..9df5d25 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -147,8 +147,7 @@ public class CompactionsTest extends SchemaLoader
 
         // check that the shadowed column is gone
         SSTableReader sstable = cfs.getSSTables().iterator().next();
-        SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter()));
-        scanner.seekTo(key);
+        SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter()), key);
         OnDiskAtomIterator iter = scanner.next();
         assertEquals(key, iter.getKey());
         assert iter.next() instanceof RangeTombstone;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
index f1e689e..cce32df 100644
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.base.Objects;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -55,24 +56,24 @@ public class LazilyCompactedRowTest extends SchemaLoader
         // compare eager and lazy compactions
         AbstractCompactionIterable eager = new CompactionIterable(OperationType.UNKNOWN,
                                                                   strategy.getScanners(sstables),
-                                                                  new PreCompactingController(cfs, sstables, gcBefore, false));
+                                                                  new PreCompactingController(cfs, sstables, gcBefore));
         AbstractCompactionIterable lazy = new CompactionIterable(OperationType.UNKNOWN,
                                                                  strategy.getScanners(sstables),
-                                                                 new LazilyCompactingController(cfs, sstables, gcBefore, false));
-        assertBytes(cfs, sstables, eager, lazy);
+                                                                 new LazilyCompactingController(cfs, sstables, gcBefore));
+        assertBytes(cfs, eager, lazy);
 
         // compare eager and parallel-lazy compactions
         eager = new CompactionIterable(OperationType.UNKNOWN,
                                        strategy.getScanners(sstables),
-                                       new PreCompactingController(cfs, sstables, gcBefore, false));
+                                       new PreCompactingController(cfs, sstables, gcBefore));
         AbstractCompactionIterable parallel = new ParallelCompactionIterable(OperationType.UNKNOWN,
                                                                              strategy.getScanners(sstables),
                                                                              new CompactionController(cfs, new HashSet<SSTableReader>(sstables), gcBefore),
                                                                              0);
-        assertBytes(cfs, sstables, eager, parallel);
+        assertBytes(cfs, eager, parallel);
     }
 
-    private static void assertBytes(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException
+    private static void assertBytes(ColumnFamilyStore cfs, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException
     {
         CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
         CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
@@ -89,8 +90,8 @@ public class LazilyCompactedRowTest extends SchemaLoader
             AbstractCompactedRow row2 = iter2.next();
             DataOutputBuffer out1 = new DataOutputBuffer();
             DataOutputBuffer out2 = new DataOutputBuffer();
-            row1.write(out1);
-            row2.write(out2);
+            row1.write(-1, out1);
+            row2.write(-1, out2);
 
             File tmpFile1 = File.createTempFile("lcrt1", null);
             File tmpFile2 = File.createTempFile("lcrt2", null);
@@ -104,28 +105,23 @@ public class LazilyCompactedRowTest extends SchemaLoader
             MappedFileDataInput in1 = new MappedFileDataInput(new FileInputStream(tmpFile1), tmpFile1.getAbsolutePath(), 0, 0);
             MappedFileDataInput in2 = new MappedFileDataInput(new FileInputStream(tmpFile2), tmpFile2.getAbsolutePath(), 0, 0);
 
-            // key isn't part of what CompactedRow writes, that's done by SSTW.append
-
-            // row size can differ b/c of bloom filter counts being different
-            long rowSize1 = in1.readLong();
-            long rowSize2 = in2.readLong();
-            assertEquals(rowSize1 + 8, out1.getLength());
-            assertEquals(rowSize2 + 8, out2.getLength());
+            // row key
+            assertEquals(ByteBufferUtil.readWithShortLength(in1), ByteBufferUtil.readWithShortLength(in2));
 
             // cf metadata
             ColumnFamily cf1 = TreeMapBackedSortedColumns.factory.create(cfs.metadata);
             ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create(cfs.metadata);
             cf1.delete(DeletionInfo.serializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT));
             cf2.delete(DeletionInfo.serializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT));
-            assert cf1.deletionInfo().equals(cf2.deletionInfo());
+            assertEquals(cf1.deletionInfo(), cf2.deletionInfo());
             // columns
-            int columns = in1.readInt();
-            assert columns == in2.readInt();
-            for (int i = 0; i < columns; i++)
+            while (true)
             {
                 Column c1 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT);
                 Column c2 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT);
-                assert c1.equals(c2) : c1.getString(cfs.metadata.comparator) + " != " + c2.getString(cfs.metadata.comparator);
+                assert Objects.equal(c1, c2) : c1.getString(cfs.metadata.comparator) + " != " + c2.getString(cfs.metadata.comparator);
+                if (c1 == null)
+                    break;
             }
             // that should be everything
             assert in1.available() == 0;
@@ -137,8 +133,8 @@ public class LazilyCompactedRowTest extends SchemaLoader
     {
         AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore, false));
-        AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore, false));
+        AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore));
+        AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore));
         CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
         CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
 
@@ -291,7 +287,7 @@ public class LazilyCompactedRowTest extends SchemaLoader
 
     private static class LazilyCompactingController extends CompactionController
     {
-        public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
+        public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
         {
             super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
         }
@@ -305,7 +301,7 @@ public class LazilyCompactedRowTest extends SchemaLoader
 
     private static class PreCompactingController extends CompactionController
     {
-        public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
+        public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
         {
             super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
deleted file mode 100644
index 135f444..0000000
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.cassandra.io.sstable;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.junit.Test;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.Util;
-
-public class SSTableTest extends SchemaLoader
-{
-    @Test
-    public void testSingleWrite() throws IOException
-    {
-        // write test data
-        ByteBuffer key = ByteBufferUtil.bytes(Integer.toString(1));
-        ByteBuffer cbytes = ByteBuffer.wrap(new byte[1024]);
-        new Random().nextBytes(cbytes.array());
-        ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
-        cf.addColumn(new Column(cbytes, cbytes));
-
-        SortedMap<DecoratedKey, ColumnFamily> map = new TreeMap<DecoratedKey, ColumnFamily>();
-        map.put(Util.dk(key), cf);
-        SSTableReader ssTable = SSTableUtils.prepare().cf("Standard1").write(map);
-
-        // verify
-        ByteBuffer bytes = Util.serializeForSSTable(cf);
-        verifySingle(ssTable, bytes, key);
-        ssTable = SSTableReader.open(ssTable.descriptor); // read the index from disk
-        verifySingle(ssTable, bytes, key);
-    }
-
-    private void verifySingle(SSTableReader sstable, ByteBuffer bytes, ByteBuffer key) throws IOException
-    {
-        RandomAccessReader file = sstable.openDataReader();
-        file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position);
-        assert key.equals(ByteBufferUtil.readWithShortLength(file));
-        int size = (int) file.readLong();
-        byte[] bytes2 = new byte[size];
-        file.readFully(bytes2);
-        assert ByteBuffer.wrap(bytes2).equals(bytes);
-    }
-
-    @Test
-    public void testManyWrites() throws IOException
-    {
-        SortedMap<DecoratedKey, ColumnFamily> map = new TreeMap<DecoratedKey, ColumnFamily>();
-        SortedMap<ByteBuffer, ByteBuffer> bytesMap = new TreeMap<ByteBuffer, ByteBuffer>();
-        //for (int i = 100; i < 1000; ++i)
-        for (int i = 100; i < 300; ++i)
-        {
-            ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard2");
-            ByteBuffer bytes = ByteBufferUtil.bytes(("Avinash Lakshman is a good man: " + i));
-            cf.addColumn(new Column(bytes, bytes));
-            map.put(Util.dk(Integer.toString(i)), cf);
-            bytesMap.put(ByteBufferUtil.bytes(Integer.toString(i)), Util.serializeForSSTable(cf));
-        }
-
-        // write
-        SSTableReader ssTable = SSTableUtils.prepare().cf("Standard2").write(map);
-
-        // verify
-        verifyMany(ssTable, bytesMap);
-        ssTable = SSTableReader.open(ssTable.descriptor); // read the index from disk
-        verifyMany(ssTable, bytesMap);
-
-        Set<Component> live = SSTable.componentsFor(ssTable.descriptor);
-        assert !live.isEmpty() : "SSTable has no live components";
-        Set<Component> temp = SSTable.componentsFor(ssTable.descriptor.asTemporary(true));
-        assert temp.isEmpty() : "SSTable has unexpected temp components";
-    }
-
-    private void verifyMany(SSTableReader sstable, Map<ByteBuffer, ByteBuffer> map) throws IOException
-    {
-        List<ByteBuffer> keys = new ArrayList<ByteBuffer>(map.keySet());
-        //Collections.shuffle(keys);
-        RandomAccessReader file = sstable.openDataReader();
-        for (ByteBuffer key : keys)
-        {
-            file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position);
-            assert key.equals( ByteBufferUtil.readWithShortLength(file));
-            int size = (int) file.readLong();
-            byte[] bytes2 = new byte[size];
-            file.readFully(bytes2);
-            assert Arrays.equals(bytes2, map.get(key).array());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index 73abbe2..c3e83cb 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -50,7 +50,7 @@ public class LeaveAndBootstrapTest
     private static IPartitioner oldPartitioner;
 
     @BeforeClass
-    public static void setup() throws IOException
+    public static void setup() throws IOException, ConfigurationException
     {
         oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
         SchemaLoader.loadSchema();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java
index e30bbde..5454127 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -57,7 +57,7 @@ public class MoveTest
      * So instead of extending SchemaLoader, we call it's method below.
      */
     @BeforeClass
-    public static void setup() throws IOException
+    public static void setup() throws IOException, ConfigurationException
     {
         oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
         SchemaLoader.loadSchema();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 278c8f1..4f3217c 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -65,7 +65,7 @@ public class RemoveTest
     UUID removalId;
 
     @BeforeClass
-    public static void setupClass() throws IOException
+    public static void setupClass() throws IOException, ConfigurationException
     {
         oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
         SchemaLoader.loadSchema();


[2/2] git commit: Single-pass compaction patch by jbellis; reviewed by marcuse for CASSANDRA-4180

Posted by jb...@apache.org.
Single-pass compaction
patch by jbellis; reviewed by marcuse for CASSANDRA-4180


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9bd531b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9bd531b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9bd531b

Branch: refs/heads/trunk
Commit: a9bd531bc9a286cc8e81a800aaa29c1fead62dcd
Parents: 3ca160e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue May 7 14:20:53 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue May 7 14:21:23 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/db/Column.java       |   27 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    4 +-
 src/java/org/apache/cassandra/db/ColumnIndex.java  |   46 +++-
 src/java/org/apache/cassandra/db/OnDiskAtom.java   |    5 +-
 .../apache/cassandra/db/RowIteratorFactory.java    |    3 +-
 .../columniterator/ICountableColumnIterator.java   |   25 --
 .../db/columniterator/IndexedSliceReader.java      |   29 +-
 .../db/columniterator/SSTableNamesIterator.java    |   10 +-
 .../db/columniterator/SimpleSliceReader.java       |   32 +--
 .../db/compaction/AbstractCompactedRow.java        |   18 +-
 .../cassandra/db/compaction/CompactionManager.java |   53 ++--
 .../cassandra/db/compaction/CompactionTask.java    |    5 +-
 .../db/compaction/LazilyCompactedRow.java          |  111 ++------
 .../db/compaction/ParallelCompactionIterable.java  |   28 +--
 .../cassandra/db/compaction/PrecompactedRow.java   |   40 +---
 .../apache/cassandra/db/compaction/Scrubber.java   |   49 ++--
 .../apache/cassandra/io/sstable/Descriptor.java    |    3 +
 .../io/sstable/SSTableBoundedScanner.java          |   95 ------
 .../io/sstable/SSTableIdentityIterator.java        |   99 ++-----
 .../apache/cassandra/io/sstable/SSTableReader.java |   13 +-
 .../cassandra/io/sstable/SSTableScanner.java       |  184 +++++-------
 .../apache/cassandra/io/sstable/SSTableWriter.java |  134 ++++-----
 .../cassandra/streaming/IncomingStreamReader.java  |   33 +--
 .../org/apache/cassandra/tools/SSTableExport.java  |   54 ++--
 test/unit/org/apache/cassandra/SchemaLoader.java   |   21 +-
 test/unit/org/apache/cassandra/Util.java           |   18 --
 test/unit/org/apache/cassandra/db/ScrubTest.java   |  222 ---------------
 .../apache/cassandra/db/SerializationsTest.java    |    3 +-
 .../cassandra/db/compaction/CompactionsTest.java   |    3 +-
 .../cassandra/io/LazilyCompactedRowTest.java       |   44 ++--
 .../apache/cassandra/io/sstable/SSTableTest.java   |  112 --------
 .../cassandra/service/LeaveAndBootstrapTest.java   |    2 +-
 .../org/apache/cassandra/service/MoveTest.java     |    2 +-
 .../org/apache/cassandra/service/RemoveTest.java   |    2 +-
 35 files changed, 424 insertions(+), 1106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 60accb1..4cd7b6b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0
+ * Single-pass compaction (CASSANDRA-4180)
  * Removed token range bisection (CASSANDRA-5518)
  * Removed compatibility with pre-1.2.5 sstables and network messages
    (CASSANDRA-5511)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index bf4f5b3..a8c1f94 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -27,6 +27,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.collect.AbstractIterator;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -50,33 +52,34 @@ public class Column implements OnDiskAtom
         return OnDiskAtom.Serializer.instance;
     }
 
+    /**
+     * For 2.0-formatted sstables (where column count is not stored), @param count should be Integer.MAX_VALUE,
+     * and we will look for the end-of-row column name marker instead of relying on that.
+     */
     public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in, final int count, final ColumnSerializer.Flag flag, final int expireBefore, final Descriptor.Version version)
     {
-        return new Iterator<OnDiskAtom>()
+        return new AbstractIterator<OnDiskAtom>()
         {
             int i = 0;
 
-            public boolean hasNext()
+            protected OnDiskAtom computeNext()
             {
-                return i < count;
-            }
+                if (i++ >= count)
+                    return endOfData();
 
-            public OnDiskAtom next()
-            {
-                ++i;
+                OnDiskAtom atom;
                 try
                 {
-                    return onDiskSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
+                    atom = onDiskSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
                 }
                 catch (IOException e)
                 {
                     throw new IOError(e);
                 }
-            }
+                if (atom == null)
+                    return endOfData();
 
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
+                return atom;
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b07d97c..bbbb554 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1480,6 +1480,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, this);
 
+            // todo this could be pushed into SSTableScanner
             return new AbstractScanIterator()
             {
                 protected Row computeNext()
@@ -1498,7 +1499,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     if (!range.contains(key))
                         return computeNext();
 
-                    logger.trace("scanned {}", key);
+                    if (logger.isTraceEnabled())
+                        logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key));
 
                     return current;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 57bd995..bcc5c2f 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnIndex
 {
@@ -65,13 +66,21 @@ public class ColumnIndex
         private final DataOutput output;
         private final RangeTombstone.Tracker tombstoneTracker;
         private int atomCount;
+        private final ByteBuffer key;
+        private final DeletionInfo deletionInfo;
 
         public Builder(ColumnFamily cf,
                        ByteBuffer key,
                        DataOutput output,
                        boolean fromStream)
         {
-            this.indexOffset = rowHeaderSize(key, cf.deletionInfo());
+            assert cf != null;
+            assert key != null;
+            assert output != null;
+
+            this.key = key;
+            deletionInfo = cf.deletionInfo();
+            this.indexOffset = rowHeaderSize(key, deletionInfo);
             this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
             this.output = output;
             this.tombstoneTracker = fromStream ? null : new RangeTombstone.Tracker(cf.getComparator());
@@ -94,9 +103,7 @@ public class ColumnIndex
             // TODO fix constantSize when changing the nativeconststs.
             int keysize = key.remaining();
             return typeSizes.sizeof((short) keysize) + keysize          // Row key
-                 + typeSizes.sizeof(0L)                                 // Row data size
-                 + DeletionTime.serializer.serializedSize(delInfo.getTopLevelDeletion(), typeSizes)
-                 + typeSizes.sizeof(0);                                 // Column count
+                 + DeletionTime.serializer.serializedSize(delInfo.getTopLevelDeletion(), typeSizes);
         }
 
         public RangeTombstone.Tracker tombstoneTracker()
@@ -119,6 +126,7 @@ public class ColumnIndex
          */
         public ColumnIndex build(ColumnFamily cf) throws IOException
         {
+            // cf has disentangled the columns and range tombstones, we need to re-interleave them in comparator order
             Iterator<RangeTombstone> rangeIter = cf.deletionInfo().rangeIterator();
             RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
             Comparator<ByteBuffer> comparator = cf.getComparator();
@@ -138,15 +146,22 @@ public class ColumnIndex
                 add(tombstone);
                 tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
             }
-            return build();
+            ColumnIndex index = build();
+
+            finish();
+
+            return index;
         }
 
         public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
         {
             for (OnDiskAtom c : columns)
                 add(c);
+            ColumnIndex index = build();
+
+            finish();
 
-            return build();
+            return index;
         }
 
         public void add(OnDiskAtom column) throws IOException
@@ -177,8 +192,8 @@ public class ColumnIndex
                 lastBlockClosing = column;
             }
 
-            if (output != null)
-                atomSerializer.serializeForSSTable(column, output);
+            maybeWriteRowHeader();
+            atomSerializer.serializeForSSTable(column, output);
 
             // TODO: Should deal with removing unneeded tombstones
             if (tombstoneTracker != null)
@@ -187,6 +202,15 @@ public class ColumnIndex
             lastColumn = column;
         }
 
+        private void maybeWriteRowHeader() throws IOException
+        {
+            if (lastColumn == null)
+            {
+                ByteBufferUtil.writeWithShortLength(key, output);
+                DeletionInfo.serializer().serializeForSSTable(deletionInfo, output);
+            }
+        }
+
         public ColumnIndex build()
         {
             // all columns were GC'd after all
@@ -204,5 +228,11 @@ public class ColumnIndex
             assert result.columnsIndex.size() > 0;
             return result;
         }
+
+        public void finish() throws IOException
+        {
+            if (!deletionInfo.equals(DeletionInfo.LIVE))
+                maybeWriteRowHeader();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 7cf7914..2fdb7ad 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -73,7 +73,10 @@ public interface OnDiskAtom
         {
             ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
             if (name.remaining() <= 0)
-                throw ColumnSerializer.CorruptColumnException.create(in, name);
+            {
+                // SSTableWriter.END_OF_ROW
+                return null;
+            }
 
             int b = in.readUnsignedByte();
             if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 1bc506b..4588146 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -69,8 +69,7 @@ public class RowIteratorFactory
 
         for (SSTableReader sstable : sstables)
         {
-            final SSTableScanner scanner = sstable.getScanner(filter);
-            scanner.seekTo(startWith);
+            final SSTableScanner scanner = sstable.getScanner(filter, startWith);
             iterators.add(scanner);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java b/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
deleted file mode 100644
index 0b5fa97..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-
-public interface ICountableColumnIterator extends OnDiskAtomIterator
-{
-    public int getColumnCount();
-
-    public void reset();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index ffbca1a..ccf9149 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -74,16 +74,15 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
         {
             Descriptor.Version version = sstable.descriptor.version;
             this.indexes = indexEntry.columnsIndex();
+            emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
             if (indexes.isEmpty())
             {
-                setToRowStart(sstable, indexEntry, input);
-                this.emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
+                setToRowStart(indexEntry, input);
                 emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version));
                 fetcher = new SimpleBlockFetcher();
             }
             else
             {
-                emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
                 emptyColumnFamily.delete(indexEntry.deletionTime());
                 fetcher = new IndexedBlockFetcher(indexEntry.position);
             }
@@ -98,19 +97,20 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
     /**
      * Sets the seek position to the start of the row for column scanning.
      */
-    private void setToRowStart(SSTableReader reader, RowIndexEntry indexEntry, FileDataInput input) throws IOException
+    private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException
     {
-        if (input == null)
+        if (in == null)
         {
-            this.file = sstable.getFileDataInput(indexEntry.position);
+            this.file = sstable.getFileDataInput(rowEntry.position);
         }
         else
         {
-            this.file = input;
-            input.seek(indexEntry.position);
+            this.file = in;
+            in.seek(rowEntry.position);
         }
         sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
-        file.readLong();
+        if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+            file.readLong();
     }
 
     public ColumnFamily getColumnFamily()
@@ -210,7 +210,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
     private class IndexedBlockFetcher extends BlockFetcher
     {
         // where this row starts
-        private final long basePosition;
+        private final long columnsStart;
 
         // the index entry for the next block to deserialize
         private int nextIndexIdx = -1;
@@ -222,10 +222,10 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
         // may still match a slice
         private final Deque<OnDiskAtom> prefetched;
 
-        public IndexedBlockFetcher(long basePosition)
+        public IndexedBlockFetcher(long columnsStart)
         {
             super(-1);
-            this.basePosition = basePosition;
+            this.columnsStart = columnsStart;
             this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
             setNextSlice();
         }
@@ -330,7 +330,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
 
             /* seek to the correct offset to the data, and calculate the data size */
-            long positionToSeek = basePosition + currentIndex.offset;
+            long positionToSeek = columnsStart + currentIndex.offset;
 
             // With new promoted indexes, our first seek in the data file will happen at that point.
             if (file == null)
@@ -420,7 +420,8 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             // We remenber when we are whithin a slice to avoid some comparison
             boolean inSlice = false;
 
-            Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
+            int columnCount = sstable.descriptor.version.hasRowSizeAndColumnCount ? file.readInt() : Integer.MAX_VALUE;
+            Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
             while (atomIterator.hasNext())
             {
                 OnDiskAtom column = atomIterator.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index bc0af30..f0e6dc7 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -115,7 +115,8 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
 
             DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
             assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath());
-            file.readLong();
+            if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+                file.readLong();
         }
 
         indexList = indexEntry.columnsIndex();
@@ -142,7 +143,8 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
         List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
         if (indexList.isEmpty())
         {
-            readSimpleColumns(file, columns, result);
+            int columnCount = sstable.descriptor.version.hasRowSizeAndColumnCount ? file.readInt() : Integer.MAX_VALUE;
+            readSimpleColumns(file, columns, result, columnCount);
         }
         else
         {
@@ -153,9 +155,9 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
         iter = result.iterator();
     }
 
-    private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<OnDiskAtom> result) throws IOException
+    private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<OnDiskAtom> result, int columnCount) throws IOException
     {
-        Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
+        Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
         int n = 0;
         while (atomIterator.hasNext())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
index efbb92c..d0d74c6 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.Iterator;
 
 import com.google.common.collect.AbstractIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -30,21 +32,22 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
 {
+    private static final Logger logger = LoggerFactory.getLogger(SimpleSliceReader.class);
+
     private final FileDataInput file;
     private final boolean needsClosing;
     private final ByteBuffer finishColumn;
     private final AbstractType<?> comparator;
     private final ColumnFamily emptyColumnFamily;
-    private FileMark mark;
     private final Iterator<OnDiskAtom> atomIterator;
 
     public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ByteBuffer finishColumn)
     {
+        logger.debug("Slicing {}", sstable);
         this.finishColumn = finishColumn;
         this.comparator = sstable.metadata.comparator;
         try
@@ -61,16 +64,17 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
                 this.needsClosing = false;
             }
 
+            Descriptor.Version version = sstable.descriptor.version;
+
             // Skip key and data size
             ByteBufferUtil.skipShortLength(file);
-            file.readLong();
-
-            Descriptor.Version version = sstable.descriptor.version;
+            if (version.hasRowSizeAndColumnCount)
+                file.readLong();
 
             emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
-            emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version));
-            atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, file.readInt(), version);
-            mark = file.mark();
+            emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version));
+            int columnCount = version.hasRowSizeAndColumnCount ? file.readInt() : Integer.MAX_VALUE;
+            atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
         }
         catch (IOException e)
         {
@@ -84,20 +88,10 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
         if (!atomIterator.hasNext())
             return endOfData();
 
-        OnDiskAtom column;
-        try
-        {
-            file.reset(mark);
-            column = atomIterator.next();
-        }
-        catch (IOException e)
-        {
-            throw new CorruptSSTableException(e, file.getPath());
-        }
+        OnDiskAtom column = atomIterator.next();
         if (finishColumn.remaining() > 0 && comparator.compare(column.name(), finishColumn) > 0)
             return endOfData();
 
-        mark = file.mark();
         return column;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 56e9353..cd2952a 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.security.MessageDigest;
 
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.db.DeletionInfo;
 import org.apache.cassandra.db.ColumnIndex;
@@ -48,7 +49,7 @@ public abstract class AbstractCompactedRow implements Closeable
      *
      * write() may change internal state; it is NOT valid to call write() or update() a second time.
      */
-    public abstract long write(DataOutput out) throws IOException;
+    public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
 
     /**
      * update @param digest with the data bytes of the row (not including row key or row size).
@@ -59,23 +60,8 @@ public abstract class AbstractCompactedRow implements Closeable
     public abstract void update(MessageDigest digest);
 
     /**
-     * @return true if there are no columns in the row AND there are no row-level tombstones to be preserved
-     */
-    public abstract boolean isEmpty();
-
-    /**
      * @return aggregate information about the columns in this row.  Some fields may
      * contain default values if computing them value would require extra effort we're not willing to make.
      */
     public abstract ColumnStats columnStats();
-
-    /**
-     * @return the compacted row deletion infos.
-     */
-    public abstract DeletionInfo deletionInfo();
-
-    /**
-     * @return the column index for this row.
-     */
-    public abstract ColumnIndex index();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 877d349..758068c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -482,9 +482,6 @@ public class CompactionManager implements CompactionManagerMBean
             if (logger.isDebugEnabled())
                 logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
-            SSTableWriter writer = null;
-            SSTableReader newSstable = null;
-
             logger.info("Cleaning up " + sstable);
             // Calculate the expected compacted filesize
             long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable), OperationType.CLEANUP);
@@ -498,6 +495,11 @@ public class CompactionManager implements CompactionManagerMBean
 
             CleanupInfo ci = new CleanupInfo(sstable, scanner);
             metrics.beginCompaction(ci);
+            SSTableWriter writer = createWriter(cfs,
+                                                compactionFileLocation,
+                                                expectedBloomFilterSize,
+                                                sstable);
+            SSTableReader newSstable = null;
             try
             {
                 while (scanner.hasNext())
@@ -508,11 +510,8 @@ public class CompactionManager implements CompactionManagerMBean
                     if (Range.isInRanges(row.getKey().token, ranges))
                     {
                         AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                        if (compactedRow.isEmpty())
-                            continue;
-                        writer = maybeCreateWriter(cfs, OperationType.CLEANUP, compactionFileLocation, expectedBloomFilterSize, writer, sstable);
-                        writer.append(compactedRow);
-                        totalkeysWritten++;
+                        if (writer.append(compactedRow) != null)
+                            totalkeysWritten++;
                     }
                     else
                     {
@@ -553,13 +552,14 @@ public class CompactionManager implements CompactionManagerMBean
                         }
                     }
                 }
-                if (writer != null)
+                if (totalkeysWritten > 0)
                     newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+                else
+                    writer.abort();
             }
             catch (Throwable e)
             {
-                if (writer != null)
-                    writer.abort();
+                writer.abort();
                 throw Throwables.propagate(e);
             }
             finally
@@ -589,23 +589,17 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    public static SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs,
-                                                  OperationType compactionType,
-                                                  File compactionFileLocation,
-                                                  int expectedBloomFilterSize,
-                                                  SSTableWriter writer,
-                                                  SSTableReader sstable)
+    public static SSTableWriter createWriter(ColumnFamilyStore cfs,
+                                             File compactionFileLocation,
+                                             int expectedBloomFilterSize,
+                                             SSTableReader sstable)
     {
-        if (writer == null)
-        {
-            FileUtils.createDirectory(compactionFileLocation);
-            writer = new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
-                                       expectedBloomFilterSize,
-                                       cfs.metadata,
-                                       cfs.partitioner,
-                                       SSTableMetadata.createCollector(Collections.singleton(sstable), sstable.getSSTableLevel()));
-        }
-        return writer;
+        FileUtils.createDirectory(compactionFileLocation);
+        return new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
+                                 expectedBloomFilterSize,
+                                 cfs.metadata,
+                                 cfs.partitioner,
+                                 SSTableMetadata.createCollector(Collections.singleton(sstable), sstable.getSSTableLevel()));
     }
 
     /**
@@ -661,10 +655,7 @@ public class CompactionManager implements CompactionManagerMBean
                 if (ci.isStopRequested())
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
                 AbstractCompactedRow row = iter.next();
-                if (row.isEmpty())
-                    row.close();
-                else
-                    validator.add(row);
+                validator.add(row);
             }
             validator.complete();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 3b4fdc8..e32956b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -156,7 +156,8 @@ public class CompactionTask extends AbstractCompactionTask
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 
                 AbstractCompactedRow row = iter.next();
-                if (row.isEmpty())
+                RowIndexEntry indexEntry = writer.append(row);
+                if (indexEntry == null)
                 {
                     controller.invalidateCachedRow(row.key);
                     row.close();
@@ -166,8 +167,6 @@ public class CompactionTask extends AbstractCompactionTask
                 // If the row is cached, we call removeDeleted on at read time it to have coherent query returns,
                 // but if the row is not pushed out of the cache, obsolete tombstones will persist indefinitely.
                 controller.removeDeletedInCache(row.key);
-
-                RowIndexEntry indexEntry = writer.append(row);
                 totalkeysWritten++;
 
                 if (DatabaseDescriptor.getPreheatKeyCache())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index b799f0d..444ee11 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -29,13 +29,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.StreamingHistogram;
 
@@ -45,35 +46,30 @@ import org.apache.cassandra.utils.StreamingHistogram;
  * of the rows being compacted, and merging them as it does so.  So the most we have
  * in memory at a time is the bloom filter, the index, and one column from each
  * pre-compaction row.
- *
- * When write() or update() is called, a second pass is made over the pre-compaction
- * rows to write the merged columns or update the hash, again with at most one column
- * from each row deserialized at a time.
  */
 public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable<OnDiskAtom>
 {
     private static Logger logger = LoggerFactory.getLogger(LazilyCompactedRow.class);
 
-    private final List<? extends ICountableColumnIterator> rows;
+    private final List<? extends OnDiskAtomIterator> rows;
     private final CompactionController controller;
     private final boolean shouldPurge;
     private ColumnFamily emptyColumnFamily;
     private Reducer reducer;
-    private final ColumnStats columnStats;
-    private long columnSerializedSize;
+    private ColumnStats columnStats;
     private boolean closed;
     private ColumnIndex.Builder indexBuilder;
-    private ColumnIndex columnsIndex;
     private final SecondaryIndexManager.Updater indexer;
+    private long maxDelTimestamp;
 
-    public LazilyCompactedRow(CompactionController controller, List<? extends ICountableColumnIterator> rows)
+    public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
         super(rows.get(0).getKey());
         this.rows = rows;
         this.controller = controller;
         indexer = controller.cfs.indexManager.updaterFor(key);
 
-        long maxDelTimestamp = Long.MIN_VALUE;
+        maxDelTimestamp = Long.MIN_VALUE;
         for (OnDiskAtomIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamily();
@@ -85,58 +81,45 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 emptyColumnFamily.delete(cf);
         }
         this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+    }
+
+    public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
+    {
+        assert !closed;
 
+        ColumnIndex columnsIndex;
         try
         {
-            indexAndWrite(null);
+            indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
+            columnsIndex = indexBuilder.build(this);
+            if (columnsIndex.columnsIndex.isEmpty())
+            {
+                boolean cfIrrelevant = shouldPurge
+                                       ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
+                                       : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
+                if (cfIrrelevant)
+                    return null;
+            }
         }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
-        // reach into the reducer used during iteration to get column count, size, max column timestamp
+        // reach into the reducer (created during iteration) to get column count, size, max column timestamp
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
-        columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, 
-                                      reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen, 
+        columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
+                                      reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
                                       reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
                                       reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
                                       reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones
         );
-        columnSerializedSize = reducer == null ? 0 : reducer.serializedSize;
         reducer = null;
-    }
-
-    private void indexAndWrite(DataOutput out) throws IOException
-    {
-        this.indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
-        this.columnsIndex = indexBuilder.build(this);
-    }
-
-    public long write(DataOutput out) throws IOException
-    {
-        assert !closed;
-
-        DataOutputBuffer clockOut = new DataOutputBuffer();
-        DeletionInfo.serializer().serializeForSSTable(emptyColumnFamily.deletionInfo(), clockOut);
-
-        long dataSize = clockOut.getLength() + columnSerializedSize;
-        if (logger.isDebugEnabled())
-            logger.debug(String.format("clock / column sizes are %s / %s", clockOut.getLength(), columnSerializedSize));
-        assert dataSize > 0;
-        out.writeLong(dataSize);
-        out.write(clockOut.getData(), 0, clockOut.getLength());
-        out.writeInt(indexBuilder.writtenAtomCount());
 
-        // We rebuild the column index uselessly, but we need to do that because range tombstone markers depend
-        // on indexing. If we're able to remove the two-phase compaction, we'll avoid that.
-        indexAndWrite(out);
-
-        long secondPassColumnSize = reducer == null ? 0 : reducer.serializedSize;
-        assert secondPassColumnSize == columnSerializedSize
-               : "originally calculated column size of " + columnSerializedSize + " but now it is " + secondPassColumnSize;
+        out.writeShort(SSTableWriter.END_OF_ROW);
 
         close();
-        return dataSize;
+
+        return RowIndexEntry.create(currentPosition, emptyColumnFamily.deletionInfo().getTopLevelDeletion(), columnsIndex);
     }
 
     public void update(MessageDigest digest)
@@ -150,7 +133,6 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         try
         {
             DeletionInfo.serializer().serializeForSSTable(emptyColumnFamily.deletionInfo(), out);
-            out.writeInt(columnStats.columnCount);
             digest.update(out.getData(), 0, out.getLength());
         }
         catch (IOException e)
@@ -158,30 +140,14 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             throw new AssertionError(e);
         }
 
+        // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator
+        indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
         Iterator<OnDiskAtom> iter = iterator();
         while (iter.hasNext())
-        {
             iter.next().updateDigest(digest);
-        }
         close();
     }
 
-    public boolean isEmpty()
-    {
-        boolean cfIrrelevant = shouldPurge
-                             ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
-                             : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
-        return cfIrrelevant && columnStats.columnCount == 0;
-    }
-
-    public int getEstimatedColumnCount()
-    {
-        int n = 0;
-        for (ICountableColumnIterator row : rows)
-            n += row.getColumnCount();
-        return n;
-    }
-
     public AbstractType<?> getComparator()
     {
         return emptyColumnFamily.getComparator();
@@ -189,8 +155,6 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
 
     public Iterator<OnDiskAtom> iterator()
     {
-        for (ICountableColumnIterator row : rows)
-            row.reset();
         reducer = new Reducer();
         Iterator<OnDiskAtom> iter = MergeIterator.get(rows, getComparator().onDiskAtomComparator, reducer);
         return Iterators.filter(iter, Predicates.notNull());
@@ -217,19 +181,6 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         closed = true;
     }
 
-    public DeletionInfo deletionInfo()
-    {
-        return emptyColumnFamily.deletionInfo();
-    }
-
-    /**
-     * @return the column index for this row.
-     */
-    public ColumnIndex index()
-    {
-        return columnsIndex;
-    }
-
     private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
     {
         // all columns reduced together will have the same name, so there will only be one column

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 718b1e3..c0bce30 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.utils.*;
 
@@ -164,7 +164,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                 return new CompactedRowContainer(rows.get(0).getKey(), executor.submit(new MergeTask(rawRows)));
             }
 
-            List<ICountableColumnIterator> iterators = new ArrayList<ICountableColumnIterator>(rows.size());
+            List<OnDiskAtomIterator> iterators = new ArrayList<OnDiskAtomIterator>(rows.size());
             for (RowContainer container : rows)
                 iterators.add(container.row == null ? container.wrapper : new DeserializedColumnIterator(container.row));
             return new CompactedRowContainer(new LazilyCompactedRow(controller, iterators));
@@ -203,7 +203,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
             }
         }
 
-        private class DeserializedColumnIterator implements ICountableColumnIterator
+        private class DeserializedColumnIterator implements OnDiskAtomIterator
         {
             private final Row row;
             private Iterator<Column> iter;
@@ -224,16 +224,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                 return row.key;
             }
 
-            public int getColumnCount()
-            {
-                return row.cf.getColumnCount();
-            }
-
-            public void reset()
-            {
-                iter = row.cf.iterator();
-            }
-
             public void close() throws IOException {}
 
             public boolean hasNext()
@@ -319,7 +309,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
     /**
      * a wrapper around SSTII that notifies the given condition when it is closed
      */
-    private static class NotifyingSSTableIdentityIterator implements ICountableColumnIterator
+    private static class NotifyingSSTableIdentityIterator implements OnDiskAtomIterator
     {
         private final SSTableIdentityIterator wrapped;
         private final Condition condition;
@@ -340,16 +330,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
             return wrapped.getKey();
         }
 
-        public int getColumnCount()
-        {
-            return wrapped.getColumnCount();
-        }
-
-        public void reset()
-        {
-            wrapped.reset();
-        }
-
         public void close() throws IOException
         {
             wrapped.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 1e7285a..1b5d4a9 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
@@ -42,7 +43,6 @@ import org.apache.cassandra.utils.MergeIterator;
 public class PrecompactedRow extends AbstractCompactedRow
 {
     private final ColumnFamily compactedCf;
-    private ColumnIndex columnIndex;
 
     /** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
     public PrecompactedRow(DecoratedKey key, ColumnFamily cf)
@@ -152,21 +152,12 @@ public class PrecompactedRow extends AbstractCompactedRow
         filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC);
     }
 
-    public long write(DataOutput out) throws IOException
+    public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
     {
-        assert compactedCf != null;
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        ColumnIndex.Builder builder = new ColumnIndex.Builder(compactedCf, key.key, buffer);
-        columnIndex = builder.build(compactedCf);
-
-        TypeSizes typeSizes = TypeSizes.NATIVE;
-        long delSize = DeletionTime.serializer.serializedSize(compactedCf.deletionInfo().getTopLevelDeletion(), typeSizes);
-        long dataSize = buffer.getLength() + delSize + typeSizes.sizeof(0);
-        out.writeLong(dataSize);
-        DeletionInfo.serializer().serializeForSSTable(compactedCf.deletionInfo(), out);
-        out.writeInt(builder.writtenAtomCount());
-        out.write(buffer.getData(), 0, buffer.getLength());
-        return dataSize;
+        if (compactedCf == null)
+            return null;
+
+        return SSTableWriter.rawAppend(compactedCf, currentPosition, key, out);
     }
 
     public void update(MessageDigest digest)
@@ -176,7 +167,6 @@ public class PrecompactedRow extends AbstractCompactedRow
         try
         {
             DeletionInfo.serializer().serializeForSSTable(compactedCf.deletionInfo(), buffer);
-            buffer.writeInt(compactedCf.getColumnCount());
             digest.update(buffer.getData(), 0, buffer.getLength());
         }
         catch (IOException e)
@@ -186,11 +176,6 @@ public class PrecompactedRow extends AbstractCompactedRow
         compactedCf.updateDigest(digest);
     }
 
-    public boolean isEmpty()
-    {
-        return compactedCf == null;
-    }
-
     public ColumnStats columnStats()
     {
         return compactedCf.getColumnStats();
@@ -207,18 +192,5 @@ public class PrecompactedRow extends AbstractCompactedRow
         return compactedCf;
     }
 
-    public DeletionInfo deletionInfo()
-    {
-        return compactedCf.deletionInfo();
-    }
-
-    /**
-     * @return the column index for this row.
-     */
-    public ColumnIndex index()
-    {
-        return columnIndex;
-    }
-
     public void close() { }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 421385a..5809741 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -111,7 +111,7 @@ public class Scrubber implements Closeable
             }
 
             // TODO errors when creating the writer may leave empty temp files.
-            writer = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, sstable);
+            writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
 
             AbstractCompactedRow prevRow = null;
 
@@ -168,25 +168,21 @@ public class Scrubber implements Closeable
                         throw new IOError(new IOException("Unable to read row key from data file"));
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size " + dataSize));
-                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+
+                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
                     AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                    if (compactedRow.isEmpty())
+                    if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
                     {
-                        emptyRows++;
+                        outOfOrderRows.add(compactedRow);
+                        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                        continue;
                     }
-                    else
-                    {
-                        if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
-                        {
-                            outOfOrderRows.add(compactedRow);
-                            outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
-                            continue;
-                        }
 
-                        writer.append(compactedRow);
-                        prevRow = compactedRow;
+                    if (writer.append(compactedRow) == null)
+                        emptyRows++;
+                    else
                         goodRows++;
-                    }
+                    prevRow = compactedRow;
                     if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
                         outputHandler.warn("Index file contained a different key or row size; using key from data file");
                 }
@@ -204,24 +200,19 @@ public class Scrubber implements Closeable
                         key = sstable.partitioner.decorateKey(currentIndexKey);
                         try
                         {
-                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
                             AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                            if (compactedRow.isEmpty())
+                            if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
                             {
-                                emptyRows++;
+                                outOfOrderRows.add(compactedRow);
+                                outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                                continue;
                             }
+                            if (writer.append(compactedRow) == null)
+                                emptyRows++;
                             else
-                            {
-                                if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
-                                {
-                                    outOfOrderRows.add(compactedRow);
-                                    outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
-                                    continue;
-                                }
-                                writer.append(compactedRow);
-                                prevRow = compactedRow;
                                 goodRows++;
-                            }
+                            prevRow = compactedRow;
                         }
                         catch (Throwable th2)
                         {
@@ -266,7 +257,7 @@ public class Scrubber implements Closeable
 
         if (!outOfOrderRows.isEmpty())
         {
-            SSTableWriter inOrderWriter = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, sstable);
+            SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
             for (AbstractCompactedRow row : outOfOrderRows)
                 inOrderWriter.append(row);
             newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index e2a7a18..fdc99b9 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -53,6 +53,7 @@ public class Descriptor
         //               into this new format)
         //             tracks max local deletiontime in sstable metadata
         //             records bloom_filter_fp_chance in metadata component
+        //             remove data size and column count from data file (CASSANDRA-4180)
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -63,6 +64,7 @@ public class Descriptor
         public final boolean tracksMaxLocalDeletionTime;
         public final boolean hasBloomFilterFPChance;
         public final boolean offHeapSummaries;
+        public final boolean hasRowSizeAndColumnCount;
 
         public Version(String version)
         {
@@ -72,6 +74,7 @@ public class Descriptor
             hasSuperColumns = version.compareTo("ja") < 0;
             hasBloomFilterFPChance = version.compareTo("ja") >= 0;
             offHeapSummaries = version.compareTo("ja") >= 0;
+            hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
deleted file mode 100644
index 4febea6..0000000
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * A SSTableScanner that only reads key in a given range (for validation compaction).
- */
-public class SSTableBoundedScanner extends SSTableScanner
-{
-    private final Iterator<Pair<Long, Long>> rangeIterator;
-    private Pair<Long, Long> currentRange;
-
-    SSTableBoundedScanner(SSTableReader sstable, Iterator<Pair<Long, Long>> rangeIterator, RateLimiter limiter)
-    {
-        super(sstable, limiter);
-        assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise
-        this.rangeIterator = rangeIterator;
-        currentRange = rangeIterator.next();
-        dfile.seek(currentRange.left);
-    }
-
-    /*
-     * This shouldn't be used with a bounded scanner as it could put the
-     * bounded scanner outside it's range.
-     */
-    @Override
-    public void seekTo(RowPosition seekKey)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-        if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : new BoundedKeyScanningIterator();
-        return iterator.hasNext();
-    }
-
-    @Override
-    public OnDiskAtomIterator next()
-    {
-        if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : new BoundedKeyScanningIterator();
-        return iterator.next();
-    }
-
-    protected class BoundedKeyScanningIterator extends KeyScanningIterator
-    {
-        @Override
-        public boolean hasNext()
-        {
-            if (!super.hasNext())
-                return false;
-
-            if (finishedAt < currentRange.right)
-                return true;
-
-            if (rangeIterator.hasNext())
-            {
-                currentRange = rangeIterator.next();
-                finishedAt = currentRange.left; // next() will seek for us
-                return true;
-            }
-            else
-            {
-                return false;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 8e636c5..d35e14d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -25,30 +25,25 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.BytesReadTracker;
 
-public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, ICountableColumnIterator
+public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, OnDiskAtomIterator
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableIdentityIterator.class);
 
     private final DecoratedKey key;
-    private final DataInput input;
-    private final long dataStart;
-    public final long dataSize;
+    private final DataInput in;
+    public final long dataSize; // we [still] require this so compaction can tell if it's safe to read the row into memory
     public final ColumnSerializer.Flag flag;
 
     private final ColumnFamily columnFamily;
     private final int columnCount;
-    private final long columnPosition;
 
     private final Iterator<OnDiskAtom> atomIterator;
     private final Descriptor.Version dataVersion;
 
-    private final BytesReadTracker inputWithTracker; // tracks bytes read
-
     // Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
     private final int expireBefore;
 
@@ -60,13 +55,12 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
      * @param sstable SSTable we are reading ffrom.
      * @param file Reading using this file.
      * @param key Key of this row.
-     * @param dataStart Data for this row starts at this pos.
      * @param dataSize length of row data
      * @throws IOException
      */
-    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize)
+    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize)
     {
-        this(sstable, file, key, dataStart, dataSize, false);
+        this(sstable, file, key, dataSize, false);
     }
 
     /**
@@ -74,39 +68,29 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
      * @param sstable SSTable we are reading ffrom.
      * @param file Reading using this file.
      * @param key Key of this row.
-     * @param dataStart Data for this row starts at this pos.
      * @param dataSize length of row data
      * @param checkData if true, do its best to deserialize and check the coherence of row data
      */
-    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize, boolean checkData)
+    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
     {
-        this(sstable.metadata, file, file.getPath(), key, dataStart, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
-    }
-
-    // Must only be used against current file format
-    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, String filename, DecoratedKey key, long dataStart, long dataSize, ColumnSerializer.Flag flag)
-    {
-        this(metadata, file, filename, key, dataStart, dataSize, false, null, flag);
+        this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
     }
 
     // sstable may be null *if* checkData is false
     // If it is null, we assume the data is in the current file format
     private SSTableIdentityIterator(CFMetaData metadata,
-                                    DataInput input,
+                                    DataInput in,
                                     String filename,
                                     DecoratedKey key,
-                                    long dataStart,
                                     long dataSize,
                                     boolean checkData,
                                     SSTableReader sstable,
                                     ColumnSerializer.Flag flag)
     {
         assert !checkData || (sstable != null);
-        this.input = input;
+        this.in = in;
         this.filename = filename;
-        this.inputWithTracker = new BytesReadTracker(input);
         this.key = key;
-        this.dataStart = dataStart;
         this.dataSize = dataSize;
         this.expireBefore = (int)(System.currentTimeMillis() / 1000);
         this.flag = flag;
@@ -115,21 +99,10 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
 
         try
         {
-            if (input instanceof RandomAccessReader)
-            {
-                RandomAccessReader file = (RandomAccessReader) input;
-                file.seek(this.dataStart);
-                if (dataStart + dataSize > file.length())
-                    throw new IOException(String.format("dataSize of %s starting at %s would be larger than file %s length %s",
-                                          dataSize, dataStart, file.getPath(), file.length()));
-            }
-
             columnFamily = EmptyColumns.factory.create(metadata);
-            columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(inputWithTracker, dataVersion));
-
-            columnCount = inputWithTracker.readInt();
-            atomIterator = columnFamily.metadata().getOnDiskIterator(inputWithTracker, columnCount, dataVersion);
-            columnPosition = dataStart + inputWithTracker.getBytesRead();
+            columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(in, dataVersion));
+            columnCount = dataVersion.hasRowSizeAndColumnCount ? in.readInt() : Integer.MAX_VALUE;
+            atomIterator = columnFamily.metadata().getOnDiskIterator(in, columnCount, dataVersion);
         }
         catch (IOException e)
         {
@@ -151,7 +124,18 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
 
     public boolean hasNext()
     {
-        return inputWithTracker.getBytesRead() < dataSize;
+        try
+        {
+            return atomIterator.hasNext();
+        }
+        catch (IOError e)
+        {
+            // catch here b/c atomIterator is an AbstractIterator; hasNext reads the value
+            if (e.getCause() instanceof IOException)
+                throw new CorruptSSTableException((IOException)e.getCause(), filename);
+            else
+                throw e;
+        }
     }
 
     public OnDiskAtom next()
@@ -163,13 +147,6 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
                 atom.validateFields(columnFamily.metadata());
             return atom;
         }
-        catch (IOError e)
-        {
-            if (e.getCause() instanceof IOException)
-                throw new CorruptSSTableException((IOException)e.getCause(), filename);
-            else
-                throw e;
-        }
         catch (MarshalException me)
         {
             throw new CorruptSSTableException(me, filename);
@@ -189,9 +166,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
     public String getPath()
     {
         // if input is from file, then return that path, otherwise it's from streaming
-        if (input instanceof RandomAccessReader)
+        if (in instanceof RandomAccessReader)
         {
-            RandomAccessReader file = (RandomAccessReader) input;
+            RandomAccessReader file = (RandomAccessReader) in;
             return file.getPath();
         }
         else
@@ -202,10 +179,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
 
     public ColumnFamily getColumnFamilyWithColumns(ColumnFamily.Factory containerFactory)
     {
-        assert inputWithTracker.getBytesRead() == headerSize();
         ColumnFamily cf = columnFamily.cloneMeShallow(containerFactory, false);
         // since we already read column count, just pass that value and continue deserialization
-        columnFamily.serializer.deserializeColumnsFromSSTable(inputWithTracker, cf, columnCount, flag, expireBefore, dataVersion);
+        columnFamily.serializer.deserializeColumnsFromSSTable(in, cf, columnCount, flag, expireBefore, dataVersion);
         if (validateColumns)
         {
             try
@@ -220,28 +196,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         return cf;
     }
 
-    private long headerSize()
-    {
-        return columnPosition - dataStart;
-    }
-
     public int compareTo(SSTableIdentityIterator o)
     {
         return key.compareTo(o.key);
     }
 
-    public void reset()
-    {
-        if (!(input instanceof RandomAccessReader))
-            throw new UnsupportedOperationException();
-
-        RandomAccessReader file = (RandomAccessReader) input;
-        file.seek(columnPosition);
-        inputWithTracker.reset(headerSize());
-    }
-
-    public int getColumnCount()
-    {
-        return columnCount;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a09f65b..baba472 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1000,10 +1000,15 @@ public class SSTableReader extends SSTable
      */
     public SSTableScanner getScanner(QueryFilter filter)
     {
-        return new SSTableScanner(this, filter);
+        return new SSTableScanner(this, filter, null);
     }
 
-   /**
+    public SSTableScanner getScanner(QueryFilter filter, RowPosition startWith)
+    {
+        return new SSTableScanner(this, filter, startWith, null);
+    }
+
+    /**
     * I/O SSTableScanner
     * @return A Scanner for seeking over the rows of the SSTable.
     */
@@ -1014,7 +1019,7 @@ public class SSTableReader extends SSTable
 
    public SSTableScanner getScanner(RateLimiter limiter)
    {
-       return new SSTableScanner(this, limiter);
+       return new SSTableScanner(this, null, limiter);
    }
 
    /**
@@ -1030,7 +1035,7 @@ public class SSTableReader extends SSTable
 
         Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
         if (rangeIterator.hasNext())
-            return new SSTableBoundedScanner(this, rangeIterator, limiter);
+            return new SSTableScanner(this, null, range, limiter);
         else
             return new EmptyCompactionScanner(getFilename());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 38b9e65..fb52a02 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -18,11 +18,12 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Iterator;
 
 import com.google.common.util.concurrent.RateLimiter;
 
+import com.google.common.collect.AbstractIterator;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.RowPosition;
@@ -31,6 +32,8 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -40,77 +43,83 @@ public class SSTableScanner implements ICompactionScanner
     protected final RandomAccessReader dfile;
     protected final RandomAccessReader ifile;
     public final SSTableReader sstable;
-    private OnDiskAtomIterator row;
-    protected boolean exhausted = false;
     protected Iterator<OnDiskAtomIterator> iterator;
     private final QueryFilter filter;
+    private long stopAt;
 
     /**
-     * @param sstable SSTable to scan.
-     * @param limiter
+     * @param sstable SSTable to scan; must not be null
+     * @param filter filter to use when scanning the columns; may be null
+     * @param limiter background i/o RateLimiter; may be null
      */
-    SSTableScanner(SSTableReader sstable, RateLimiter limiter)
+    SSTableScanner(SSTableReader sstable, QueryFilter filter, RateLimiter limiter)
     {
-        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
-        this.ifile = sstable.openIndexReader();
-        this.sstable = sstable;
-        this.filter = null;
-    }
+        assert sstable != null;
 
-    /**
-     * @param sstable SSTable to scan.
-     * @param filter filter to use when scanning the columns
-     */
-    SSTableScanner(SSTableReader sstable, QueryFilter filter)
-    {
-        this.dfile = sstable.openDataReader();
+        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
         this.sstable = sstable;
         this.filter = filter;
+        stopAt = dfile.length();
     }
 
-    public void close() throws IOException
+    public SSTableScanner(SSTableReader sstable, QueryFilter filter, RowPosition startWith, RateLimiter limiter)
     {
-        FileUtils.close(dfile, ifile);
-    }
+        this(sstable, filter, limiter);
+
+        long indexPosition = sstable.getIndexScanPosition(startWith);
+        // -1 means the key is before everything in the sstable. So just start from the beginning.
+        if (indexPosition == -1)
+            indexPosition = 0;
+        ifile.seek(indexPosition);
 
-    public void seekTo(RowPosition seekKey)
-    {
         try
         {
-            long indexPosition = sstable.getIndexScanPosition(seekKey);
-            // -1 means the key is before everything in the sstable. So just start from the beginning.
-            if (indexPosition == -1)
-                indexPosition = 0;
-
-            ifile.seek(indexPosition);
-
             while (!ifile.isEOF())
             {
-                long startPosition = ifile.getFilePointer();
+                indexPosition = ifile.getFilePointer();
                 DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                int comparison = indexDecoratedKey.compareTo(seekKey);
+                int comparison = indexDecoratedKey.compareTo(startWith);
                 if (comparison >= 0)
                 {
                     // Found, just read the dataPosition and seek into index and data files
                     long dataPosition = ifile.readLong();
-                    ifile.seek(startPosition);
+                    ifile.seek(indexPosition);
                     dfile.seek(dataPosition);
-                    row = null;
-                    return;
+                    break;
                 }
                 else
                 {
                     RowIndexEntry.serializer.skip(ifile);
                 }
             }
-            exhausted = true;
         }
         catch (IOException e)
         {
             sstable.markSuspect();
-            throw new CorruptSSTableException(e, ifile.getPath());
+            throw new CorruptSSTableException(e, sstable.getFilename());
         }
+
+    }
+
+    public SSTableScanner(SSTableReader sstable, QueryFilter filter, Range<Token> range, RateLimiter limiter)
+    {
+        this(sstable, filter, range.toRowBounds().left, limiter);
+
+        if (range.isWrapAround())
+        {
+            stopAt = dfile.length();
+        }
+        else
+        {
+            RowIndexEntry position = sstable.getPosition(range.toRowBounds().right, SSTableReader.Operator.GT);
+            stopAt = position == null ? dfile.length() : position.position;
+        }
+    }
+
+    public void close() throws IOException
+    {
+        FileUtils.close(dfile, ifile);
     }
 
     public long getLengthInBytes()
@@ -131,14 +140,14 @@ public class SSTableScanner implements ICompactionScanner
     public boolean hasNext()
     {
         if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : createIterator();
+            iterator = createIterator();
         return iterator.hasNext();
     }
 
     public OnDiskAtomIterator next()
     {
         if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : createIterator();
+            iterator = createIterator();
         return iterator.next();
     }
 
@@ -149,76 +158,24 @@ public class SSTableScanner implements ICompactionScanner
 
     private Iterator<OnDiskAtomIterator> createIterator()
     {
-        return filter == null ? new KeyScanningIterator() : new FilteredKeyScanningIterator();
-    }
-
-    protected class KeyScanningIterator implements Iterator<OnDiskAtomIterator>
-    {
-        protected long finishedAt;
-
-        public boolean hasNext()
-        {
-            if (row == null)
-                return !dfile.isEOF();
-            return finishedAt < dfile.length();
-        }
-
-        public OnDiskAtomIterator next()
-        {
-            try
-            {
-                if (row != null)
-                    dfile.seek(finishedAt);
-                assert !dfile.isEOF();
-
-                // Read data header
-                DecoratedKey key = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dfile));
-                long dataSize = dfile.readLong();
-                long dataStart = dfile.getFilePointer();
-                finishedAt = dataStart + dataSize;
-
-                row = new SSTableIdentityIterator(sstable, dfile, key, dataStart, dataSize);
-                return row;
-            }
-            catch (IOException e)
-            {
-                sstable.markSuspect();
-                throw new CorruptSSTableException(e, dfile.getPath());
-            }
-        }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public String toString()
-        {
-            return getClass().getSimpleName() + "(" + "finishedAt:" + finishedAt + ")";
-        }
+        return new KeyScanningIterator();
     }
 
-    protected class FilteredKeyScanningIterator implements Iterator<OnDiskAtomIterator>
+    protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
     {
-        protected DecoratedKey nextKey;
-        protected RowIndexEntry nextEntry;
-
-        public boolean hasNext()
-        {
-            if (row == null)
-                return !ifile.isEOF();
-            return nextKey != null;
-        }
+        private DecoratedKey nextKey;
+        private RowIndexEntry nextEntry;
+        private DecoratedKey currentKey;
+        private RowIndexEntry currentEntry;
 
-        public OnDiskAtomIterator next()
+        protected OnDiskAtomIterator computeNext()
         {
             try
             {
-                final DecoratedKey currentKey;
-                final RowIndexEntry currentEntry;
+                if (ifile.isEOF() && nextKey == null)
+                    return endOfData();
 
-                if (row == null)
+                if (currentKey == null)
                 {
                     currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
                     currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
@@ -229,6 +186,10 @@ public class SSTableScanner implements ICompactionScanner
                     currentEntry = nextEntry;
                 }
 
+                assert currentEntry.position <= stopAt;
+                if (currentEntry.position == stopAt)
+                    return endOfData();
+
                 if (ifile.isEOF())
                 {
                     nextKey = null;
@@ -241,7 +202,18 @@ public class SSTableScanner implements ICompactionScanner
                 }
 
                 assert !dfile.isEOF();
-                return row = new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
+
+                if (filter == null)
+                {
+                    dfile.seek(currentEntry.position);
+                    ByteBufferUtil.readWithShortLength(dfile); // key
+                    if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+                        dfile.readLong();
+                    long dataSize = (nextEntry == null ? dfile.length() : nextEntry.position) - dfile.getFilePointer();
+                    return new SSTableIdentityIterator(sstable, dfile, currentKey, dataSize);
+                }
+
+                return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
                 {
                     public OnDiskAtomIterator create()
                     {
@@ -252,14 +224,9 @@ public class SSTableScanner implements ICompactionScanner
             catch (IOException e)
             {
                 sstable.markSuspect();
-                throw new CorruptSSTableException(e, ifile.getPath());
+                throw new CorruptSSTableException(e, sstable.getFilename());
             }
         }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
     }
 
     @Override
@@ -269,7 +236,6 @@ public class SSTableScanner implements ICompactionScanner
                "dfile=" + dfile +
                " ifile=" + ifile +
                " sstable=" + sstable +
-               " exhausted=" + exhausted +
                ")";
     }
 }