You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:40 UTC

[16/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 1458461..a38e60f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -41,10 +41,9 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.dht.*;
@@ -199,6 +198,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     // not final since we need to be able to change level on a file.
     protected volatile StatsMetadata sstableMetadata;
 
+    public final SerializationHeader header;
+
     protected final AtomicLong keyCacheHit = new AtomicLong(0);
     protected final AtomicLong keyCacheRequest = new AtomicLong(0);
 
@@ -331,7 +332,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             String parentName = descriptor.cfname.substring(0, i);
             CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
             ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
-            metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
+            metadata = SecondaryIndex.newIndexMetadata(parent, def);
         }
         else
         {
@@ -375,10 +376,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor;
         assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 
-        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
-                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+        EnumSet<MetadataType> types = EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER);
+        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, types);
+
         ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
         StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+        SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER);
 
         // Check if sstable is created using same partitioner.
         // Partitioner can be null, which indicates older version of sstable or no stats available.
@@ -392,8 +395,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         }
 
         logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
-        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
-                statsMetadata, OpenReason.NORMAL);
+        SSTableReader sstable = internalOpen(descriptor,
+                                             components,
+                                             metadata,
+                                             partitioner,
+                                             System.currentTimeMillis(),
+                                             statsMetadata,
+                                             OpenReason.NORMAL,
+                                             header.toHeader(metadata));
 
         // special implementation of load to use non-pooled SegmentedFile builders
         try(SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
@@ -421,10 +430,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor;
         assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 
-        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
-                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+        // For the 3.0+ sstable format, the (misnomed) stats component hold the serialization header which we need to deserialize the sstable content
+        assert !descriptor.version.storeRows() || components.contains(Component.STATS) : "Stats component is missing for sstable " + descriptor;
+
+        EnumSet<MetadataType> types = EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER);
+        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, types);
         ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
         StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+        SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER);
+        assert !descriptor.version.storeRows() || header != null;
 
         // Check if sstable is created using same partitioner.
         // Partitioner can be null, which indicates older version of sstable or no stats available.
@@ -438,8 +452,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         }
 
         logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
-        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
-                statsMetadata, OpenReason.NORMAL);
+        SSTableReader sstable = internalOpen(descriptor,
+                                             components,
+                                             metadata,
+                                             partitioner,
+                                             System.currentTimeMillis(),
+                                             statsMetadata,
+                                             OpenReason.NORMAL,
+                                             header == null ? null : header.toHeader(metadata));
 
         // load index and filter
         long start = System.nanoTime();
@@ -520,11 +540,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                                       IFilter bf,
                                       long maxDataAge,
                                       StatsMetadata sstableMetadata,
-                                      OpenReason openReason)
+                                      OpenReason openReason,
+                                      SerializationHeader header)
     {
         assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 
-        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
 
         reader.bf = bf;
         reader.ifile = ifile;
@@ -542,11 +563,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                                             IPartitioner partitioner,
                                             Long maxDataAge,
                                             StatsMetadata sstableMetadata,
-                                            OpenReason openReason)
+                                            OpenReason openReason,
+                                            SerializationHeader header)
     {
         Factory readerFactory = descriptor.getFormat().getReaderFactory();
 
-        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
     }
 
     protected SSTableReader(final Descriptor desc,
@@ -555,13 +577,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                             IPartitioner partitioner,
                             long maxDataAge,
                             StatsMetadata sstableMetadata,
-                            OpenReason openReason)
+                            OpenReason openReason,
+                            SerializationHeader header)
     {
         super(desc, components, metadata, partitioner);
         this.sstableMetadata = sstableMetadata;
+        this.header = header;
         this.maxDataAge = maxDataAge;
         this.openReason = openReason;
-        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
+        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, desc.version, header);
     }
 
     public static long getTotalBytes(Iterable<SSTableReader> sstables)
@@ -736,12 +760,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel))
             {
                 long indexPosition;
-                RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
+                RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata, descriptor.version, header);
 
                 while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
                 {
                     ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
-                    RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
+                    RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex);
                     DecoratedKey decoratedKey = partitioner.decorateKey(key);
                     if (first == null)
                         first = decoratedKey;
@@ -979,7 +1003,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                                                  bf.sharedCopy(),
                                                  maxDataAge,
                                                  sstableMetadata,
-                                                 reason);
+                                                 reason,
+                                                 header);
         replacement.first = newFirst;
         replacement.last = last;
         replacement.isSuspect.set(isSuspect.get());
@@ -1140,7 +1165,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
      * modulo downsampling of the index summary). Always returns a value >= 0
      */
-    public long getIndexScanPosition(RowPosition key)
+    public long getIndexScanPosition(PartitionPosition key)
     {
         if (openReason == OpenReason.MOVED_START && key.compareTo(first) < 0)
             key = first;
@@ -1287,8 +1312,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
         for (Range<Token> range : Range.normalize(ranges))
         {
-            RowPosition leftPosition = range.left.maxKeyBound();
-            RowPosition rightPosition = range.right.maxKeyBound();
+            PartitionPosition leftPosition = range.left.maxKeyBound();
+            PartitionPosition rightPosition = range.right.maxKeyBound();
 
             int left = summary.binarySearch(leftPosition);
             if (left < 0)
@@ -1382,9 +1407,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         {
             assert !range.isWrapAround() || range.right.isMinimum();
             // truncate the range so it at most covers the sstable
-            AbstractBounds<RowPosition> bounds = Range.makeRowRange(range);
-            RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
-            RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
+            AbstractBounds<PartitionPosition> bounds = Range.makeRowRange(range);
+            PartitionPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
+            PartitionPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
 
             if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
                 continue;
@@ -1455,14 +1480,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     /**
      * Get position updating key cache and stats.
-     * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
+     * @see #getPosition(PartitionPosition, SSTableReader.Operator, boolean)
      */
-    public RowIndexEntry getPosition(RowPosition key, Operator op)
+    public RowIndexEntry getPosition(PartitionPosition key, Operator op)
     {
         return getPosition(key, op, true, false);
     }
 
-    public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats)
+    public RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats)
     {
         return getPosition(key, op, updateCacheAndStats, false);
     }
@@ -1473,20 +1498,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      * @param updateCacheAndStats true if updating stats and cache
      * @return The index entry corresponding to the key, or null if the key is not present
      */
-    protected abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
+    protected abstract RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
 
-    //Corresponds to a name column
-    public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
-    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
-
-    //Corresponds to a slice query
-    public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
-    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
+    public abstract SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
+    public abstract SliceableUnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
 
     /**
      * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
      */
-    public DecoratedKey firstKeyBeyond(RowPosition token)
+    public DecoratedKey firstKeyBeyond(PartitionPosition token)
     {
         if (token.compareTo(first) < 0)
             return first;
@@ -1589,19 +1609,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return getScanner((RateLimiter) null);
     }
 
-    public ISSTableScanner getScanner(RateLimiter limiter)
-    {
-        return getScanner(DataRange.allData(partitioner), limiter);
-    }
-
     /**
-     *
+     * @param columns the columns to return.
      * @param dataRange filter to use when reading the columns
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public ISSTableScanner getScanner(DataRange dataRange)
+    public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, boolean isForThrift)
     {
-        return getScanner(dataRange, null);
+        return getScanner(columns, dataRange, null, isForThrift);
     }
 
     /**
@@ -1618,6 +1633,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     }
 
     /**
+     * Direct I/O SSTableScanner over the entirety of the sstable..
+     *
+     * @return A Scanner over the full content of the SSTable.
+     */
+    public abstract ISSTableScanner getScanner(RateLimiter limiter);
+
+    /**
      * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
      *
      * @param ranges the range of keys to cover
@@ -1626,11 +1648,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
 
     /**
-     *
+     * @param columns the columns to return.
      * @param dataRange filter to use when reading the columns
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public abstract ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter);
+    public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift);
 
 
 
@@ -1761,6 +1783,43 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return sstableMetadata.maxTimestamp;
     }
 
+    public int getMinLocalDeletionTime()
+    {
+        return sstableMetadata.minLocalDeletionTime;
+    }
+
+    public int getMaxLocalDeletionTime()
+    {
+        return sstableMetadata.maxLocalDeletionTime;
+    }
+
+    public int getMinTTL()
+    {
+        return sstableMetadata.minTTL;
+    }
+
+    public int getMaxTTL()
+    {
+        return sstableMetadata.maxTTL;
+    }
+
+    public long getTotalColumnsSet()
+    {
+        return sstableMetadata.totalColumnsSet;
+    }
+
+    public long getTotalRows()
+    {
+        return sstableMetadata.totalRows;
+    }
+
+    public int getAvgColumnSetPerRow()
+    {
+        return sstableMetadata.totalRows < 0
+             ? -1
+             : (sstableMetadata.totalRows == 0 ? 0 : (int)(sstableMetadata.totalColumnsSet / sstableMetadata.totalRows));
+    }
+
     public Set<Integer> getAncestors()
     {
         try
@@ -1850,6 +1909,34 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             readMeter.mark();
     }
 
+    /**
+     * Checks if this sstable can overlap with another one based on the min/man clustering values.
+     * If this methods return false, we're guarantee that {@code this} and {@code other} have no overlapping
+     * data, i.e. no cells to reconcile.
+     */
+    public boolean mayOverlapsWith(SSTableReader other)
+    {
+        StatsMetadata m1 = getSSTableMetadata();
+        StatsMetadata m2 = other.getSSTableMetadata();
+
+        if (m1.minClusteringValues.isEmpty() || m1.maxClusteringValues.isEmpty() || m2.minClusteringValues.isEmpty() || m2.maxClusteringValues.isEmpty())
+            return true;
+
+        return !(compare(m1.maxClusteringValues, m2.minClusteringValues) < 0 || compare(m1.minClusteringValues, m2.maxClusteringValues) > 0);
+    }
+
+    private int compare(List<ByteBuffer> values1, List<ByteBuffer> values2)
+    {
+        ClusteringComparator comparator = metadata.comparator;
+        for (int i = 0; i < Math.min(values1.size(), values2.size()); i++)
+        {
+            int cmp = comparator.subtype(i).compare(values1.get(i), values2.get(i));
+            if (cmp != 0)
+                return cmp;
+        }
+        return 0;
+    }
+
     public static class SizeComparator implements Comparator<SSTableReader>
     {
         public int compare(SSTableReader o1, SSTableReader o2)
@@ -2172,7 +2259,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                                            IPartitioner partitioner,
                                            Long maxDataAge,
                                            StatsMetadata sstableMetadata,
-                                           OpenReason openReason);
+                                           OpenReason openReason,
+                                           SerializationHeader header);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index f99292e..c3c69b3 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -18,14 +18,19 @@
 
 package org.apache.cassandra.io.sstable.format;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import com.google.common.collect.Sets;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -37,13 +42,6 @@ import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * This is the API all table writers must implement.
  *
@@ -57,6 +55,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
     protected final long keyCount;
     protected final MetadataCollector metadataCollector;
     protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+    protected final SerializationHeader header;
     protected final TransactionalProxy txnProxy = txnProxy();
 
     protected abstract TransactionalProxy txnProxy();
@@ -69,30 +68,37 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         protected boolean openResult;
     }
 
-    protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+    protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header)
     {
         super(descriptor, components(metadata), metadata, partitioner);
         this.keyCount = keyCount;
         this.repairedAt = repairedAt;
         this.metadataCollector = metadataCollector;
-        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
+        this.header = header;
+        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header);
     }
 
-    public static SSTableWriter create(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata,  IPartitioner partitioner, MetadataCollector metadataCollector)
+    public static SSTableWriter create(Descriptor descriptor,
+                                       Long keyCount,
+                                       Long repairedAt,
+                                       CFMetaData metadata,
+                                       IPartitioner partitioner,
+                                       MetadataCollector metadataCollector,
+                                       SerializationHeader header)
     {
         Factory writerFactory = descriptor.getFormat().getWriterFactory();
-        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header);
     }
 
-    public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt)
+    public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, SerializationHeader header)
     {
-        return create(descriptor, keyCount, repairedAt, 0);
+        return create(descriptor, keyCount, repairedAt, 0, header);
     }
 
-    public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel)
+    public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
     {
         CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
-        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner());
+        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner(), header);
     }
 
     public static SSTableWriter create(CFMetaData metadata,
@@ -100,20 +106,21 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                        long keyCount,
                                        long repairedAt,
                                        int sstableLevel,
-                                       IPartitioner partitioner)
+                                       IPartitioner partitioner,
+                                       SerializationHeader header)
     {
         MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
-        return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector);
+        return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector, header);
     }
 
-    public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel)
+    public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
     {
-        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel);
+        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header);
     }
 
-    public static SSTableWriter create(String filename, long keyCount, long repairedAt)
+    public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header)
     {
-        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0);
+        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header);
     }
 
     private static Set<Component> components(CFMetaData metadata)
@@ -141,19 +148,18 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         return components;
     }
 
-
     public abstract void mark();
 
-
     /**
-     * @param row
-     * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+     * Appends partition data to this writer.
+     *
+     * @param iterator the partition to write
+     * @return the created index entry if something was written, that is if {@code iterator}
+     * wasn't empty, {@code null} otherwise.
+     *
+     * @throws FSWriteError if a write to the dataFile fails
      */
-    public abstract RowIndexEntry append(AbstractCompactedRow row);
-
-    public abstract void append(DecoratedKey decoratedKey, ColumnFamily cf);
-
-    public abstract long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException;
+    public abstract RowIndexEntry append(UnfilteredRowIterator iterator);
 
     public abstract long getFilePointer();
 
@@ -244,7 +250,9 @@ public abstract class SSTableWriter extends SSTable implements Transactional
     protected Map<MetadataType, MetadataComponent> finalizeMetadata()
     {
         return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
-                                                  metadata.getBloomFilterFpChance(), repairedAt);
+                                                  metadata.getBloomFilterFpChance(),
+                                                  repairedAt,
+                                                  header);
     }
 
     protected StatsMetadata statsMetadata()
@@ -276,6 +284,12 @@ public abstract class SSTableWriter extends SSTable implements Transactional
 
     public static abstract class Factory
     {
-        public abstract SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector);
+        public abstract SSTableWriter open(Descriptor descriptor,
+                                           long keyCount,
+                                           long repairedAt,
+                                           CFMetaData metadata,
+                                           IPartitioner partitioner,
+                                           MetadataCollector metadataCollector,
+                                           SerializationHeader header);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index faaa89e..8077a45 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -52,6 +52,10 @@ public abstract class Version
 
     public abstract boolean hasNewFileName();
 
+    public abstract boolean storeRows();
+
+    public abstract int correspondingMessagingVersion(); // Only use by storage that 'storeRows' so far
+
     public String getVersion()
     {
         return version;
@@ -73,6 +77,7 @@ public abstract class Version
     }
 
     abstract public boolean isCompatible();
+    abstract public boolean isCompatibleForStreaming();
 
     @Override
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index a1e32cf..fd0b5d5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -17,16 +17,14 @@
  */
 package org.apache.cassandra.io.sstable.format.big;
 
+import java.util.Iterator;
+import java.util.Set;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.AbstractCell;
-import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.OnDiskAtom;
 import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.LazilyCompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -38,9 +36,7 @@ import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileDataInput;
-
-import java.util.Iterator;
-import java.util.Set;
+import org.apache.cassandra.net.MessagingService;
 
 /**
  * Legacy bigtable format
@@ -82,38 +78,26 @@ public class BigFormat implements SSTableFormat
     }
 
     @Override
-    public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, ColumnSerializer.Flag flag, int expireBefore, CFMetaData cfm, Version version)
-    {
-        return AbstractCell.onDiskIterator(in, flag, expireBefore, version, cfm.comparator);
-    }
-
-    @Override
-    public AbstractCompactedRow getCompactedRowWriter(CompactionController controller, ImmutableList<OnDiskAtomIterator> onDiskAtomIterators)
-    {
-        return new LazilyCompactedRow(controller, onDiskAtomIterators);
-    }
-
-    @Override
-    public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData cfMetaData)
+    public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData metadata, Version version, SerializationHeader header)
     {
-        return new RowIndexEntry.Serializer(new IndexHelper.IndexInfo.Serializer(cfMetaData.comparator));
+        return new RowIndexEntry.Serializer(metadata, version, header);
     }
 
     static class WriterFactory extends SSTableWriter.Factory
     {
         @Override
-        public SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+        public SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header)
         {
-            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header);
         }
     }
 
     static class ReaderFactory extends SSTableReader.Factory
     {
         @Override
-        public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason)
+        public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
         {
-            return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+            return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
         }
     }
 
@@ -143,6 +127,8 @@ public class BigFormat implements SSTableFormat
         private final boolean hasRepairedAt;
         private final boolean tracksLegacyCounterShards;
         private final boolean newFileName;
+        public final boolean storeRows;
+        public final int correspondingMessagingVersion; // Only use by storage that 'storeRows' so far
 
         public BigVersion(String version)
         {
@@ -155,6 +141,8 @@ public class BigFormat implements SSTableFormat
             hasRepairedAt = version.compareTo("ka") >= 0;
             tracksLegacyCounterShards = version.compareTo("ka") >= 0;
             newFileName = version.compareTo("la") >= 0;
+            storeRows = version.compareTo("la") >= 0;
+            correspondingMessagingVersion = storeRows ? MessagingService.VERSION_30 : MessagingService.VERSION_21;
         }
 
         @Override
@@ -200,9 +188,27 @@ public class BigFormat implements SSTableFormat
         }
 
         @Override
+        public boolean storeRows()
+        {
+            return storeRows;
+        }
+
+        @Override
+        public int correspondingMessagingVersion()
+        {
+            return correspondingMessagingVersion;
+        }
+
+        @Override
         public boolean isCompatible()
         {
             return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0);
         }
+
+        @Override
+        public boolean isCompatibleForStreaming()
+        {
+            return isCompatible() && version.charAt(0) == current_version.charAt(0);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 3f375e7..7a7b913 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -20,13 +20,11 @@ package org.apache.cassandra.io.sstable.format.big;
 import com.google.common.util.concurrent.RateLimiter;
 import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.DataRange;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.columniterator.SSTableIterator;
+import org.apache.cassandra.db.columniterator.SSTableReversedIterator;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -37,7 +35,6 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
@@ -55,41 +52,45 @@ public class BigTableReader extends SSTableReader
 {
     private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
 
-    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason)
+    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
     {
-        super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+        super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
     }
 
-    public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns)
+    public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
     {
-        return new SSTableNamesIterator(this, key, columns);
+        return reversed
+             ? new SSTableReversedIterator(this, key, selectedColumns, isForThrift)
+             : new SSTableIterator(this, key, selectedColumns, isForThrift);
     }
 
-    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry )
+    public SliceableUnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
     {
-        return new SSTableNamesIterator(this, input, key, columns, indexEntry);
+        return reversed
+             ? new SSTableReversedIterator(this, file, key, indexEntry, selectedColumns, isForThrift)
+             : new SSTableIterator(this, file, key, indexEntry, selectedColumns, isForThrift);
     }
 
-    public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse)
+    /**
+     * @param columns the columns to return.
+     * @param dataRange filter to use when reading the columns
+     * @return A Scanner for seeking over the rows of the SSTable.
+     */
+    public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift)
     {
-        return new SSTableSliceIterator(this, key, slices, reverse);
+        return BigTableScanner.getScanner(this, columns, dataRange, limiter, isForThrift);
     }
 
-    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry)
-    {
-        return new SSTableSliceIterator(this, input, key, slices, reverse, indexEntry);
-    }
     /**
+     * Direct I/O SSTableScanner over the full sstable.
      *
-     * @param dataRange filter to use when reading the columns
-     * @return A Scanner for seeking over the rows of the SSTable.
+     * @return A Scanner for reading the full SSTable.
      */
-    public ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter)
+    public ISSTableScanner getScanner(RateLimiter limiter)
     {
-        return BigTableScanner.getScanner(this, dataRange, limiter);
+        return BigTableScanner.getScanner(this, limiter);
     }
 
-
     /**
      * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
      *
@@ -109,7 +110,7 @@ public class BigTableReader extends SSTableReader
      * @param updateCacheAndStats true if updating stats and cache
      * @return The index entry corresponding to the key, or null if the key is not present
      */
-    protected RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast)
+    protected RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast)
     {
         if (op == Operator.EQ)
         {
@@ -212,7 +213,7 @@ public class BigTableReader extends SSTableReader
                     if (opSatisfied)
                     {
                         // read data position from index entry
-                        RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, descriptor.version);
+                        RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in);
                         if (exactMatch && updateCacheAndStats)
                         {
                             assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index d477152..0794e90 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -22,16 +22,13 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Ordering;
+import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.RateLimiter;
 
-import org.apache.cassandra.db.DataRange;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
-import org.apache.cassandra.db.columniterator.LazyColumnIterator;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.AbstractBounds.Boundary;
 import org.apache.cassandra.dht.Bounds;
@@ -57,18 +54,27 @@ public class BigTableScanner implements ISSTableScanner
     protected final RandomAccessReader ifile;
     public final SSTableReader sstable;
 
-    private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
-    private AbstractBounds<RowPosition> currentRange;
+    private final Iterator<AbstractBounds<PartitionPosition>> rangeIterator;
+    private AbstractBounds<PartitionPosition> currentRange;
 
+    private final ColumnFilter columns;
     private final DataRange dataRange;
     private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+    private final boolean isForThrift;
 
-    protected Iterator<OnDiskAtomIterator> iterator;
+    protected UnfilteredPartitionIterator iterator;
 
-    public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+    // Full scan of the sstables
+    public static ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter)
     {
-        return new BigTableScanner(sstable, dataRange, limiter);
+        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, Iterators.singletonIterator(fullRange(sstable)));
     }
+
+    public static ISSTableScanner getScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift)
+    {
+        return new BigTableScanner(sstable, columns, dataRange, limiter, isForThrift, makeBounds(sstable, dataRange).iterator());
+    }
+
     public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
     {
         // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
@@ -76,60 +82,54 @@ public class BigTableScanner implements ISSTableScanner
         if (positions.isEmpty())
             return new EmptySSTableScanner(sstable.getFilename());
 
-        return new BigTableScanner(sstable, tokenRanges, limiter);
+        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, makeBounds(sstable, tokenRanges).iterator());
     }
 
-    /**
-     * @param sstable SSTable to scan; must not be null
-     * @param dataRange a single range to scan; must not be null
-     * @param limiter background i/o RateLimiter; may be null
-     */
-    private BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+    private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
     {
         assert sstable != null;
 
         this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
         this.sstable = sstable;
+        this.columns = columns;
         this.dataRange = dataRange;
-        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
-
-        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
-        addRange(dataRange.keyRange(), boundsList);
-        this.rangeIterator = boundsList.iterator();
+        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
+                                                                                                        sstable.descriptor.version,
+                                                                                                        sstable.header);
+        this.isForThrift = isForThrift;
+        this.rangeIterator = rangeIterator;
     }
 
-    /**
-     * @param sstable SSTable to scan; must not be null
-     * @param tokenRanges A set of token ranges to scan
-     * @param limiter background i/o RateLimiter; may be null
-     */
-    private BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
+    private static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
     {
-        assert sstable != null;
-
-        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
-        this.ifile = sstable.openIndexReader();
-        this.sstable = sstable;
-        this.dataRange = null;
-        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
-
-        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(tokenRanges.size());
+        List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(tokenRanges.size());
         for (Range<Token> range : Range.normalize(tokenRanges))
-            addRange(Range.makeRowRange(range), boundsList);
+            addRange(sstable, Range.makeRowRange(range), boundsList);
+        return boundsList;
+    }
 
-        this.rangeIterator = boundsList.iterator();
+    private static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, DataRange dataRange)
+    {
+        List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(2);
+        addRange(sstable, dataRange.keyRange(), boundsList);
+        return boundsList;
+    }
+
+    private static AbstractBounds<PartitionPosition> fullRange(SSTableReader sstable)
+    {
+        return new Bounds<PartitionPosition>(sstable.first, sstable.last);
     }
 
-    private void addRange(AbstractBounds<RowPosition> requested, List<AbstractBounds<RowPosition>> boundsList)
+    private static void addRange(SSTableReader sstable, AbstractBounds<PartitionPosition> requested, List<AbstractBounds<PartitionPosition>> boundsList)
     {
         if (requested instanceof Range && ((Range)requested).isWrapAround())
         {
             if (requested.right.compareTo(sstable.first) >= 0)
             {
                 // since we wrap, we must contain the whole sstable prior to stopKey()
-                Boundary<RowPosition> left = new Boundary<RowPosition>(sstable.first, true);
-                Boundary<RowPosition> right;
+                Boundary<PartitionPosition> left = new Boundary<PartitionPosition>(sstable.first, true);
+                Boundary<PartitionPosition> right;
                 right = requested.rightBoundary();
                 right = minRight(right, sstable.last, true);
                 if (!isEmpty(left, right))
@@ -138,8 +138,8 @@ public class BigTableScanner implements ISSTableScanner
             if (requested.left.compareTo(sstable.last) <= 0)
             {
                 // since we wrap, we must contain the whole sstable after dataRange.startKey()
-                Boundary<RowPosition> right = new Boundary<RowPosition>(sstable.last, true);
-                Boundary<RowPosition> left;
+                Boundary<PartitionPosition> right = new Boundary<PartitionPosition>(sstable.last, true);
+                Boundary<PartitionPosition> left;
                 left = requested.leftBoundary();
                 left = maxLeft(left, sstable.first, true);
                 if (!isEmpty(left, right))
@@ -149,12 +149,12 @@ public class BigTableScanner implements ISSTableScanner
         else
         {
             assert requested.left.compareTo(requested.right) <= 0 || requested.right.isMinimum();
-            Boundary<RowPosition> left, right;
+            Boundary<PartitionPosition> left, right;
             left = requested.leftBoundary();
             right = requested.rightBoundary();
             left = maxLeft(left, sstable.first, true);
             // apparently isWrapAround() doesn't count Bounds that extend to the limit (min) as wrapping
-            right = requested.right.isMinimum() ? new Boundary<RowPosition>(sstable.last, true)
+            right = requested.right.isMinimum() ? new Boundary<PartitionPosition>(sstable.last, true)
                                                     : minRight(right, sstable.last, true);
             if (!isEmpty(left, right))
                 boundsList.add(AbstractBounds.bounds(left, right));
@@ -193,10 +193,18 @@ public class BigTableScanner implements ISSTableScanner
         }
     }
 
-    public void close() throws IOException
+    public void close()
     {
-        if (isClosed.compareAndSet(false, true))
-            FileUtils.close(dfile, ifile);
+        try
+        {
+            if (isClosed.compareAndSet(false, true))
+                FileUtils.close(dfile, ifile);
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, sstable.getFilename());
+        }
     }
 
     public long getLengthInBytes()
@@ -214,6 +222,11 @@ public class BigTableScanner implements ISSTableScanner
         return sstable.toString();
     }
 
+    public boolean isForThrift()
+    {
+        return isForThrift;
+    }
+
     public boolean hasNext()
     {
         if (iterator == null)
@@ -221,7 +234,7 @@ public class BigTableScanner implements ISSTableScanner
         return iterator.hasNext();
     }
 
-    public OnDiskAtomIterator next()
+    public UnfilteredRowIterator next()
     {
         if (iterator == null)
             iterator = createIterator();
@@ -233,19 +246,24 @@ public class BigTableScanner implements ISSTableScanner
         throw new UnsupportedOperationException();
     }
 
-    private Iterator<OnDiskAtomIterator> createIterator()
+    private UnfilteredPartitionIterator createIterator()
     {
         return new KeyScanningIterator();
     }
 
-    protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
+    protected class KeyScanningIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
     {
         private DecoratedKey nextKey;
         private RowIndexEntry nextEntry;
         private DecoratedKey currentKey;
         private RowIndexEntry currentEntry;
 
-        protected OnDiskAtomIterator computeNext()
+        public boolean isForThrift()
+        {
+            return isForThrift;
+        }
+
+        protected UnfilteredRowIterator computeNext()
         {
             try
             {
@@ -264,7 +282,7 @@ public class BigTableScanner implements ISSTableScanner
                             return endOfData();
 
                         currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                        currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+                        currentEntry = rowIndexEntrySerializer.deserialize(ifile);
                     } while (!currentRange.contains(currentKey));
                 }
                 else
@@ -283,7 +301,7 @@ public class BigTableScanner implements ISSTableScanner
                 {
                     // we need the position of the start of the next key, regardless of whether it falls in the current range
                     nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                    nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+                    nextEntry = rowIndexEntrySerializer.deserialize(ifile);
 
                     if (!currentRange.contains(nextKey))
                     {
@@ -292,21 +310,34 @@ public class BigTableScanner implements ISSTableScanner
                     }
                 }
 
-                if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey()))
+                /*
+                 * For a given partition key, we want to avoid hitting the data
+                 * file unless we're explicitely asked to. This is important
+                 * for PartitionRangeReadCommand#checkCacheFilter.
+                 */
+                return new LazilyInitializedUnfilteredRowIterator(currentKey)
                 {
-                    dfile.seek(currentEntry.position + currentEntry.headerOffset());
-                    ByteBufferUtil.readWithShortLength(dfile); // key
-                    return new SSTableIdentityIterator(sstable, dfile, currentKey);
-                }
-
-                return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
-                {
-                    public OnDiskAtomIterator create()
+                    protected UnfilteredRowIterator initializeIterator()
                     {
-                        return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
+                        try
+                        {
+                            if (dataRange == null)
+                            {
+                                dfile.seek(currentEntry.position + currentEntry.headerOffset());
+                                ByteBufferUtil.readWithShortLength(dfile); // key
+                                return new SSTableIdentityIterator(sstable, dfile, partitionKey());
+                            }
+
+                            ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(partitionKey());
+                            return filter.filter(sstable.iterator(dfile, partitionKey(), currentEntry, columns, filter.isReversed(), isForThrift));
+                        }
+                        catch (CorruptSSTableException | IOException e)
+                        {
+                            sstable.markSuspect();
+                            throw new CorruptSSTableException(e, sstable.getFilename());
+                        }
                     }
-                });
-
+                };
             }
             catch (CorruptSSTableException | IOException e)
             {
@@ -314,6 +345,11 @@ public class BigTableScanner implements ISSTableScanner
                 throw new CorruptSSTableException(e, sstable.getFilename());
             }
         }
+
+        public void close()
+        {
+            BigTableScanner.this.close();
+        }
     }
 
     @Override
@@ -326,7 +362,7 @@ public class BigTableScanner implements ISSTableScanner
                ")";
     }
 
-    public static class EmptySSTableScanner implements ISSTableScanner
+    public static class EmptySSTableScanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner
     {
         private final String filename;
 
@@ -350,20 +386,19 @@ public class BigTableScanner implements ISSTableScanner
             return filename;
         }
 
+        public boolean isForThrift()
+        {
+            return false;
+        }
+
         public boolean hasNext()
         {
             return false;
         }
 
-        public OnDiskAtomIterator next()
+        public UnfilteredRowIterator next()
         {
             return null;
         }
-
-        public void close() throws IOException { }
-
-        public void remove() { }
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 30b55a0..66b8ac0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -18,10 +18,7 @@
 package org.apache.cassandra.io.sstable.format.big;
 
 import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -29,12 +26,13 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.sstable.format.Version;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
@@ -47,7 +45,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FilterFactory;
 import org.apache.cassandra.utils.IFilter;
-import org.apache.cassandra.utils.StreamingHistogram;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
 import static org.apache.cassandra.utils.Throwables.merge;
@@ -66,9 +63,9 @@ public class BigTableWriter extends SSTableWriter
     private DecoratedKey lastWrittenKey;
     private FileMark dataMark;
 
-    BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+    public BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header)
     {
-        super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+        super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header);
 
         if (compression)
         {
@@ -124,21 +121,38 @@ public class BigTableWriter extends SSTableWriter
     }
 
     /**
-     * @param row
-     * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+     * Appends partition data to this writer.
+     *
+     * @param iterator the partition to write
+     * @return the created index entry if something was written, that is if {@code iterator}
+     * wasn't empty, {@code null} otherwise.
+     *
+     * @throws FSWriteError if a write to the dataFile fails
      */
-    public RowIndexEntry append(AbstractCompactedRow row)
+    public RowIndexEntry append(UnfilteredRowIterator iterator)
     {
-        long startPosition = beforeAppend(row.key);
-        RowIndexEntry entry;
-        try
+        DecoratedKey key = iterator.partitionKey();
+
+        if (key.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+        {
+            logger.error("Key size {} exceeds maximum of {}, skipping row", key.getKey().remaining(), FBUtilities.MAX_UNSIGNED_SHORT);
+            return null;
+        }
+
+        if (iterator.isEmpty())
+            return null;
+
+        long startPosition = beforeAppend(key);
+
+        try (StatsCollector withStats = new StatsCollector(iterator, metadataCollector))
         {
-            entry = row.write(startPosition, dataFile);
-            if (entry == null)
-                return null;
+            ColumnIndex index = ColumnIndex.writeAndBuildIndex(withStats, dataFile, header, descriptor.version);
+
+            RowIndexEntry entry = RowIndexEntry.create(startPosition, iterator.partitionLevelDeletion(), index);
+
             long endPosition = dataFile.getFilePointer();
-            metadataCollector.update(endPosition - startPosition, row.columnStats());
-            afterAppend(row.key, endPosition, entry);
+            metadataCollector.addPartitionSizeInBytes(endPosition - startPosition);
+            afterAppend(key, endPosition, entry);
             return entry;
         }
         catch (IOException e)
@@ -147,130 +161,77 @@ public class BigTableWriter extends SSTableWriter
         }
     }
 
-    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
+    private static class StatsCollector extends WrappingUnfilteredRowIterator
     {
-        if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
-        {
-            logger.error("Key size {} exceeds maximum of {}, skipping row",
-                         decoratedKey.getKey().remaining(),
-                         FBUtilities.MAX_UNSIGNED_SHORT);
-            return;
-        }
+        private int cellCount;
+        private final MetadataCollector collector;
+        private final Set<ColumnDefinition> complexColumnsSetInRow = new HashSet<>();
 
-        long startPosition = beforeAppend(decoratedKey);
-        long endPosition;
-        try
-        {
-            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
-            endPosition = dataFile.getFilePointer();
-            afterAppend(decoratedKey, endPosition, entry);
-        }
-        catch (IOException e)
+        StatsCollector(UnfilteredRowIterator iter, MetadataCollector collector)
         {
-            throw new FSWriteError(e, dataFile.getPath());
+            super(iter);
+            this.collector = collector;
+            collector.update(iter.partitionLevelDeletion());
         }
-        metadataCollector.update(endPosition - startPosition, cf.getColumnStats());
-    }
-
-    private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
-    {
-        assert cf.hasColumns() || cf.isMarkedForDelete();
 
-        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
-        ColumnIndex index = builder.build(cf);
+        @Override
+        public Unfiltered next()
+        {
+            Unfiltered unfiltered = super.next();
+            collector.updateClusteringValues(unfiltered.clustering());
 
-        out.writeShort(END_OF_ROW);
-        return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
-    }
+            switch (unfiltered.kind())
+            {
+                case ROW:
+                    Row row = (Row) unfiltered;
+                    collector.update(row.primaryKeyLivenessInfo());
+                    collector.update(row.deletion());
 
-    /**
-     * @throws IOException if a read from the DataInput fails
-     * @throws FSWriteError if a write to the dataFile fails
-     */
-    public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
-    {
-        long currentPosition = beforeAppend(key);
+                    int simpleColumnsSet = 0;
+                    complexColumnsSetInRow.clear();
 
-        ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
-        ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
-        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
-        List<ByteBuffer> minColumnNames = Collections.emptyList();
-        List<ByteBuffer> maxColumnNames = Collections.emptyList();
-        StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
-        boolean hasLegacyCounterShards = false;
+                    for (Cell cell : row)
+                    {
+                        if (cell.column().isComplex())
+                            complexColumnsSetInRow.add(cell.column());
+                        else
+                            ++simpleColumnsSet;
 
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
-        cf.delete(DeletionTime.serializer.deserialize(in));
+                        ++cellCount;
+                        collector.update(cell.livenessInfo());
 
-        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
+                        if (cell.isCounterCell())
+                            collector.updateHasLegacyCounterShards(CounterCells.hasLegacyShards(cell));
+                    }
 
-        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
-        {
-            tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
-            maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
-            minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
-            maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
-        }
+                    for (int i = 0; i < row.columns().complexColumnCount(); i++)
+                        collector.update(row.getDeletion(row.columns().getComplex(i)));
 
-        Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
-        while (rangeTombstoneIterator.hasNext())
-        {
-            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
-            tombstones.update(rangeTombstone.getLocalDeletionTime());
-            minTimestampTracker.update(rangeTombstone.timestamp());
-            maxTimestampTracker.update(rangeTombstone.timestamp());
-            maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
-            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
-            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
-        }
+                    collector.updateColumnSetPerRow(simpleColumnsSet + complexColumnsSetInRow.size());
 
-        Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
-        try
-        {
-            while (iter.hasNext())
-            {
-                OnDiskAtom atom = iter.next();
-                if (atom == null)
                     break;
-
-                if (atom instanceof CounterCell)
-                {
-                    atom = ((CounterCell) atom).markLocalToBeCleared();
-                    hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
-                }
-
-                int deletionTime = atom.getLocalDeletionTime();
-                if (deletionTime < Integer.MAX_VALUE)
-                    tombstones.update(deletionTime);
-                minTimestampTracker.update(atom.timestamp());
-                maxTimestampTracker.update(atom.timestamp());
-                minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
-                maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
-                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
-
-                columnIndexer.add(atom); // This write the atom on disk too
+                case RANGE_TOMBSTONE_MARKER:
+                    if (((RangeTombstoneMarker) unfiltered).isBoundary())
+                    {
+                        RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)unfiltered;
+                        collector.update(bm.endDeletionTime());
+                        collector.update(bm.startDeletionTime());
+                    }
+                    else
+                    {
+                        collector.update(((RangeTombstoneBoundMarker)unfiltered).deletionTime());
+                    }
+                    break;
             }
-
-            columnIndexer.maybeWriteEmptyRowHeader();
-            dataFile.stream.writeShort(END_OF_ROW);
+            return unfiltered;
         }
-        catch (IOException e)
+
+        @Override
+        public void close()
         {
-            throw new FSWriteError(e, dataFile.getPath());
+            collector.addCellPerPartitionCount(cellCount);
+            super.close();
         }
-
-        metadataCollector.updateMinTimestamp(minTimestampTracker.get())
-                         .updateMaxTimestamp(maxTimestampTracker.get())
-                         .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
-                         .addRowSize(dataFile.getFilePointer() - currentPosition)
-                         .addColumnCount(columnIndexer.writtenAtomCount())
-                         .mergeTombstoneHistogram(tombstones)
-                         .updateMinColumnNames(minColumnNames)
-                         .updateMaxColumnNames(maxColumnNames)
-                         .updateHasLegacyCounterShards(hasLegacyCounterShards);
-
-        afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
-        return currentPosition;
     }
 
     private Descriptor makeTmpLinks()
@@ -303,7 +264,7 @@ public class BigTableWriter extends SSTableWriter
                                                            components, metadata,
                                                            partitioner, ifile,
                                                            dfile, iwriter.summary.build(partitioner, boundary),
-                                                           iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY);
+                                                           iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY, header);
 
         // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
         sstable.first = getMinimalKey(first);
@@ -339,7 +300,8 @@ public class BigTableWriter extends SSTableWriter
                                                            iwriter.bf.sharedCopy(),
                                                            maxDataAge,
                                                            stats,
-                                                           openReason);
+                                                           openReason,
+                                                           header);
         sstable.first = getMinimalKey(first);
         sstable.last = getMinimalKey(last);
         return sstable;