You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/28 10:04:02 UTC

[1/3] cassandra git commit: Serialize sstable row columns using subset encoding

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 16b9f8bba -> 0d74c3ebf
  refs/heads/trunk f744b6c05 -> b1ad7f0c7


Serialize sstable row columns using subset encoding

Instead of making an sstable-wide sparse/dense coding
decision, this patch encodes all rows using the Columns
subset encoding, that results in a small bitmap for tables
with fewer than 64 columns, and delta encoding when larger

patch by benedict; reviewed by sylvain for CASSANDRA-10045


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d74c3eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d74c3eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d74c3eb

Branch: refs/heads/cassandra-3.0
Commit: 0d74c3ebf76f4f7875f2a2e5dd25a7a1c1edfc0a
Parents: 16b9f8b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Aug 17 18:41:37 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 28 09:00:52 2015 +0100

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/Columns.java   |  46 ++--
 .../org/apache/cassandra/db/LegacyLayout.java   |   2 +-
 .../apache/cassandra/db/PartitionColumns.java   |   4 +-
 .../cassandra/db/SerializationHeader.java       |  18 --
 .../columniterator/AbstractSSTableIterator.java |   2 +-
 .../columniterator/SSTableReversedIterator.java |   2 +-
 .../apache/cassandra/db/filter/DataLimits.java  |   4 +-
 .../db/partitions/PartitionUpdate.java          |   4 +-
 .../apache/cassandra/db/rows/AbstractRow.java   |   3 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  12 +
 .../apache/cassandra/db/rows/BufferCell.java    |  31 +--
 .../apache/cassandra/db/rows/EncodingStats.java |  37 +---
 src/java/org/apache/cassandra/db/rows/Row.java  |   2 +-
 .../db/rows/UnfilteredRowIterators.java         |   4 +-
 .../cassandra/db/rows/UnfilteredSerializer.java | 217 ++++++-------------
 .../io/sstable/SSTableIdentityIterator.java     |   2 +-
 .../org/apache/cassandra/db/ColumnsTest.java    |   8 +-
 .../org/apache/cassandra/db/PartitionTest.java  |   2 +-
 test/unit/org/apache/cassandra/db/RowTest.java  |   2 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   6 +-
 .../cassandra/io/sstable/SSTableUtils.java      |  11 +-
 21 files changed, 158 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index ddb9930..46e8401 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -46,7 +46,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
  * Note that in practice, it will either store only static columns, or only regular ones. When
  * we need both type of columns, we use a {@link PartitionColumns} object.
  */
-public class Columns implements Iterable<ColumnDefinition>
+public class Columns extends AbstractCollection<ColumnDefinition> implements Collection<ColumnDefinition>
 {
     public static final Serializer serializer = new Serializer();
     public static final Columns NONE = new Columns(BTree.empty(), 0);
@@ -136,7 +136,7 @@ public class Columns implements Iterable<ColumnDefinition>
      *
      * @return the total number of columns in this object.
      */
-    public int columnCount()
+    public int size()
     {
         return BTree.size(columns);
     }
@@ -261,14 +261,16 @@ public class Columns implements Iterable<ColumnDefinition>
      *
      * @return whether all the columns of {@code other} are contained by this object.
      */
-    public boolean contains(Columns other)
+    public boolean containsAll(Collection<?> other)
     {
-        if (other.columnCount() > columnCount())
+        if (other == this)
+            return true;
+        if (other.size() > this.size())
             return false;
 
         BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = BTree.slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC);
-        for (ColumnDefinition def : BTree.<ColumnDefinition>iterable(other.columns))
-            if (iter.next(def) == null)
+        for (Object def : other)
+            if (iter.next((ColumnDefinition) def) == null)
                 return false;
         return true;
     }
@@ -379,28 +381,28 @@ public class Columns implements Iterable<ColumnDefinition>
     @Override
     public String toString()
     {
-        StringBuilder sb = new StringBuilder();
+        StringBuilder sb = new StringBuilder("[");
         boolean first = true;
         for (ColumnDefinition def : this)
         {
             if (first) first = false; else sb.append(" ");
             sb.append(def.name);
         }
-        return sb.toString();
+        return sb.append("]").toString();
     }
 
     public static class Serializer
     {
         public void serialize(Columns columns, DataOutputPlus out) throws IOException
         {
-            out.writeVInt(columns.columnCount());
+            out.writeVInt(columns.size());
             for (ColumnDefinition column : columns)
                 ByteBufferUtil.writeWithVIntLength(column.name.bytes, out);
         }
 
         public long serializedSize(Columns columns)
         {
-            long size = TypeSizes.sizeofVInt(columns.columnCount());
+            long size = TypeSizes.sizeofVInt(columns.size());
             for (ColumnDefinition column : columns)
                 size += ByteBufferUtil.serializedSizeWithVIntLength(column.name.bytes);
             return size;
@@ -433,7 +435,7 @@ public class Columns implements Iterable<ColumnDefinition>
          * If both ends have a pre-shared superset of the columns we are serializing, we can send them much
          * more efficiently. Both ends must provide the identically same set of columns.
          */
-        public void serializeSubset(Columns columns, Columns superset, DataOutputPlus out) throws IOException
+        public void serializeSubset(Collection<ColumnDefinition> columns, Columns superset, DataOutputPlus out) throws IOException
         {
             /**
              * We weight this towards small sets, and sets where the majority of items are present, since
@@ -447,8 +449,8 @@ public class Columns implements Iterable<ColumnDefinition>
              * to a vint encoded set of deltas, either adding or subtracting (whichever is most efficient).
              * We indicate this switch by sending our bitmap with every bit set, i.e. -1L
              */
-            int columnCount = columns.columnCount();
-            int supersetCount = superset.columnCount();
+            int columnCount = columns.size();
+            int supersetCount = superset.size();
             if (columnCount == supersetCount)
             {
                 out.writeUnsignedVInt(0);
@@ -463,10 +465,10 @@ public class Columns implements Iterable<ColumnDefinition>
             }
         }
 
-        public long serializedSubsetSize(Columns columns, Columns superset)
+        public long serializedSubsetSize(Collection<ColumnDefinition> columns, Columns superset)
         {
-            int columnCount = columns.columnCount();
-            int supersetCount = superset.columnCount();
+            int columnCount = columns.size();
+            int supersetCount = superset.size();
             if (columnCount == supersetCount)
             {
                 return TypeSizes.sizeofUnsignedVInt(0);
@@ -488,7 +490,7 @@ public class Columns implements Iterable<ColumnDefinition>
             {
                 return superset;
             }
-            else if (superset.columnCount() >= 64)
+            else if (superset.size() >= 64)
             {
                 return deserializeLargeSubset(in, superset, (int) encoded);
             }
@@ -512,7 +514,7 @@ public class Columns implements Iterable<ColumnDefinition>
 
         // encodes a 1 bit for every *missing* column, on the assumption presence is more common,
         // and because this is consistent with encoding 0 to represent all present
-        private static long encodeBitmap(Columns columns, Columns superset, int supersetCount)
+        private static long encodeBitmap(Collection<ColumnDefinition> columns, Columns superset, int supersetCount)
         {
             long bitmap = 0L;
             BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = superset.iterator();
@@ -521,7 +523,7 @@ public class Columns implements Iterable<ColumnDefinition>
             for (ColumnDefinition column : columns)
             {
                 if (iter.next(column) == null)
-                    throw new IllegalStateException();
+                    throw new IllegalStateException(columns + " is not a subset of " + superset);
 
                 int currentIndex = iter.indexOfCurrent();
                 int count = currentIndex - expectIndex;
@@ -537,7 +539,7 @@ public class Columns implements Iterable<ColumnDefinition>
         }
 
         @DontInline
-        private void serializeLargeSubset(Columns columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException
+        private void serializeLargeSubset(Collection<ColumnDefinition> columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException
         {
             // write flag indicating we're in lengthy mode
             out.writeUnsignedVInt(supersetCount - columnCount);
@@ -572,7 +574,7 @@ public class Columns implements Iterable<ColumnDefinition>
         @DontInline
         private Columns deserializeLargeSubset(DataInputPlus in, Columns superset, int delta) throws IOException
         {
-            int supersetCount = superset.columnCount();
+            int supersetCount = superset.size();
             int columnCount = supersetCount - delta;
 
             BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
@@ -609,7 +611,7 @@ public class Columns implements Iterable<ColumnDefinition>
         }
 
         @DontInline
-        private int serializeLargeSubsetSize(Columns columns, int columnCount, Columns superset, int supersetCount)
+        private int serializeLargeSubsetSize(Collection<ColumnDefinition> columns, int columnCount, Columns superset, int supersetCount)
         {
             // write flag indicating we're in lengthy mode
             int size = TypeSizes.sizeofUnsignedVInt(supersetCount - columnCount);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 1c72d31..628ac75 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -534,7 +534,7 @@ public abstract class LegacyLayout
             // TODO: there is in practice nothing to do here, but we need to handle the column_metadata for super columns somewhere else
             throw new UnsupportedOperationException();
 
-        Set<ByteBuffer> columnsToFetch = new HashSet<>(statics.columnCount());
+        Set<ByteBuffer> columnsToFetch = new HashSet<>(statics.size());
         for (ColumnDefinition column : statics)
             columnsToFetch.add(column.name.bytes);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java
index aa60198..e1008df 100644
--- a/src/java/org/apache/cassandra/db/PartitionColumns.java
+++ b/src/java/org/apache/cassandra/db/PartitionColumns.java
@@ -78,7 +78,7 @@ public class PartitionColumns implements Iterable<ColumnDefinition>
 
     public boolean includes(PartitionColumns columns)
     {
-        return statics.contains(columns.statics) && regulars.contains(columns.regulars);
+        return statics.containsAll(columns.statics) && regulars.containsAll(columns.regulars);
     }
 
     public Iterator<ColumnDefinition> iterator()
@@ -94,7 +94,7 @@ public class PartitionColumns implements Iterable<ColumnDefinition>
     /** * Returns the total number of static and regular columns. */
     public int size()
     {
-        return regulars.columnCount() + statics.columnCount();
+        return regulars.size() + statics.size();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 88f6832..8d4e604 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -54,9 +54,6 @@ public class SerializationHeader
 
     private final Map<ByteBuffer, AbstractType<?>> typeMap;
 
-    // Whether or not to store cell in a sparse or dense way. See UnfilteredSerializer for details.
-    private final boolean useSparseColumnLayout;
-
     private SerializationHeader(AbstractType<?> keyType,
                                 List<AbstractType<?>> clusteringTypes,
                                 PartitionColumns columns,
@@ -68,21 +65,6 @@ public class SerializationHeader
         this.columns = columns;
         this.stats = stats;
         this.typeMap = typeMap;
-
-        // For the dense layout, we have a 1 byte overhead for absent columns. For the sparse layout, it's a 1
-        // overhead for present columns (in fact we use a 2 byte id, but assuming vint encoding, we'll pay 2 bytes
-        // only for the columns after the 128th one and for simplicity we assume that once you have that many column,
-        // you'll tend towards a clearly dense or clearly sparse case so that the heurstic above shouldn't still be
-        // too inapropriate). So if on average more than half of our columns are set per row, we better go for sparse.
-        this.useSparseColumnLayout = stats.avgColumnSetPerRow <= (columns.regulars.columnCount()/ 2);
-    }
-
-    public boolean useSparseColumnLayout(boolean isStatic)
-    {
-        // We always use a dense layout for the static row. Having very many static columns with  only a few set at
-        // any given time doesn't feel very common at all (and we already optimize the case where no static at all
-        // are provided).
-        return !isStatic && useSparseColumnLayout;
     }
 
     public static SerializationHeader forKeyCache(CFMetaData metadata)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 5e6165f..cf4bff7 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -214,7 +214,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
     {
         // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
         // SerializationHeader.make() for details) so we use the latter instead.
-        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
     }
 
     public boolean hasNext()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 4d2e294..a5a1938 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -74,7 +74,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
         protected ReusablePartitionData createBuffer(int blocksCount)
         {
             int estimatedRowCount = 16;
-            int columnCount = metadata().partitionColumns().regulars.columnCount();
+            int columnCount = metadata().partitionColumns().regulars.size();
             if (columnCount == 0 || metadata().clusteringColumns().isEmpty())
             {
                 estimatedRowCount = 1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 3e608b4..0d6f816 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -270,7 +270,7 @@ public abstract class DataLimits
         {
             // TODO: we should start storing stats on the number of rows (instead of the number of cells, which
             // is what getMeanColumns returns)
-            float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.columnCount();
+            float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
             return rowsPerPartition * (cfs.estimateKeys());
         }
 
@@ -506,7 +506,7 @@ public abstract class DataLimits
         public float estimateTotalResults(ColumnFamilyStore cfs)
         {
             // remember that getMeansColumns returns a number of cells: we should clean nomenclature
-            float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.columnCount();
+            float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
             return cellsPerPartition * cfs.estimateKeys();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 3d2c94b..5e056d2 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -529,7 +529,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         if (row.isStatic())
         {
             // We test for == first because in most case it'll be true and that is faster
-            assert columns().statics == row.columns() || columns().statics.contains(row.columns());
+            assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns();
             Row staticRow = holder.staticRow.isEmpty()
                       ? row
                       : Rows.merge(holder.staticRow, row, createdAtInSec);
@@ -538,7 +538,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         else
         {
             // We test for == first because in most case it'll be true and that is faster
-            assert columns().regulars == row.columns() || columns().regulars.contains(row.columns());
+            assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns();
             rowBuilder.add(row);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 6090274..fca765f 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -18,6 +18,7 @@ package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.AbstractCollection;
 import java.util.Objects;
 
 import com.google.common.collect.Iterables;
@@ -34,7 +35,7 @@ import org.apache.cassandra.utils.FBUtilities;
  * Unless you have a very good reason not to, every row implementation
  * should probably extend this class.
  */
-public abstract class AbstractRow implements Row
+public abstract class AbstractRow extends AbstractCollection<ColumnData> implements Row
 {
     public Unfiltered.Kind kind()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 7e50716..ed036af 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -22,6 +22,7 @@ import java.util.*;
 import java.util.function.Predicate;
 
 import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -35,6 +36,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 
 /**
@@ -164,6 +166,11 @@ public class BTreeRow extends AbstractRow
         return columns;
     }
 
+    public Collection<ColumnDefinition> actualColumns()
+    {
+        return Collections2.transform(this, ColumnData::column);
+    }
+
     public LivenessInfo primaryKeyLivenessInfo()
     {
         return primaryKeyLivenessInfo;
@@ -207,6 +214,11 @@ public class BTreeRow extends AbstractRow
         return searchIterator();
     }
 
+    public int size()
+    {
+        return BTree.size(btree);
+    }
+
     public Iterable<Cell> cells()
     {
         return CellIterator::new;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index f9a3026..4176ba6 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -218,28 +218,21 @@ public class BufferCell extends AbstractCell
      */
     static class Serializer implements Cell.Serializer
     {
-        private final static int PRESENCE_MASK               = 0x01; // Marks the actual presence of a cell. This is used only when serialized on-disk and
-                                                                     // on-wire (i.e. an actual ByteBufferBackedCell instance cannot have this flag set).
-        private final static int IS_DELETED_MASK             = 0x02; // Whether the cell is a tombstone or not.
-        private final static int IS_EXPIRING_MASK            = 0x04; // Whether the cell is expiring.
-        private final static int HAS_EMPTY_VALUE_MASK        = 0x08; // Wether the cell has an empty value. This will be the case for tombstone in particular.
-        private final static int USE_ROW_TIMESTAMP_MASK      = 0x10; // Wether the cell has the same timestamp than the row this is a cell of.
-        private final static int USE_ROW_TTL_MASK            = 0x20; // Wether the cell has the same ttl than the row this is a cell of.
+        private final static int IS_DELETED_MASK             = 0x01; // Whether the cell is a tombstone or not.
+        private final static int IS_EXPIRING_MASK            = 0x02; // Whether the cell is expiring.
+        private final static int HAS_EMPTY_VALUE_MASK        = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
+        private final static int USE_ROW_TIMESTAMP_MASK      = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
+        private final static int USE_ROW_TTL_MASK            = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
 
         public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
         {
-            if (cell == null)
-            {
-                out.writeByte((byte)0);
-                return;
-            }
-
+            assert cell != null;
             boolean hasValue = cell.value().hasRemaining();
             boolean isDeleted = cell.isTombstone();
             boolean isExpiring = cell.isExpiring();
             boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
             boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
-            int flags = PRESENCE_MASK;
+            int flags = 0;
             if (!hasValue)
                 flags |= HAS_EMPTY_VALUE_MASK;
 
@@ -273,9 +266,6 @@ public class BufferCell extends AbstractCell
         public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
         {
             int flags = in.readUnsignedByte();
-            if ((flags & PRESENCE_MASK) == 0)
-                return null;
-
             boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
             boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
             boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
@@ -317,10 +307,6 @@ public class BufferCell extends AbstractCell
         public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header)
         {
             long size = 1; // flags
-
-            if (cell == null)
-                return size;
-
             boolean hasValue = cell.value().hasRemaining();
             boolean isDeleted = cell.isTombstone();
             boolean isExpiring = cell.isExpiring();
@@ -348,9 +334,6 @@ public class BufferCell extends AbstractCell
         public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
         {
             int flags = in.readUnsignedByte();
-            if ((flags & PRESENCE_MASK) == 0)
-                return false;
-
             boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
             boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
             boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/EncodingStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index efa40ad..5f1a749 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -30,8 +30,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  * <p>
  * Those stats are used to optimize the on-wire and on-disk storage of rows. More precisely,
  * the {@code minTimestamp}, {@code minLocalDeletionTime} and {@code minTTL} stats are used to
- * delta-encode those information for the sake of vint encoding. And {@code avgColumnSetPerRow}
- * is used to decide if cells should be stored in a sparse or dense way (see {@link UnfilteredSerializer}).
+ * delta-encode those information for the sake of vint encoding.
  * <p>
  * Note that due to their use, those stats can suffer to be somewhat inaccurate (the more incurrate
  * they are, the less effective the storage will be, but provided the stats are not completly wacky,
@@ -63,7 +62,7 @@ public class EncodingStats
     }
 
     // We should use this sparingly obviously
-    public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH, -1);
+    public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH);
 
     public static final Serializer serializer = new Serializer();
 
@@ -71,13 +70,9 @@ public class EncodingStats
     public final int minLocalDeletionTime;
     public final int minTTL;
 
-    // Will be < 0 if the value is unknown
-    public final int avgColumnSetPerRow;
-
     public EncodingStats(long minTimestamp,
                          int minLocalDeletionTime,
-                         int minTTL,
-                         int avgColumnSetPerRow)
+                         int minTTL)
     {
         // Note that the exact value of those don't impact correctness, just the efficiency of the encoding. So when we
         // get a value for timestamp (resp. minLocalDeletionTime) that means 'no object had a timestamp' (resp. 'a local
@@ -87,7 +82,6 @@ public class EncodingStats
         this.minTimestamp = minTimestamp == LivenessInfo.NO_TIMESTAMP ? TIMESTAMP_EPOCH : minTimestamp;
         this.minLocalDeletionTime = minLocalDeletionTime == LivenessInfo.NO_EXPIRATION_TIME ? DELETION_TIME_EPOCH : minLocalDeletionTime;
         this.minTTL = minTTL;
-        this.avgColumnSetPerRow = avgColumnSetPerRow;
     }
 
     /**
@@ -110,11 +104,7 @@ public class EncodingStats
                    ? that.minTTL
                    : (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL));
 
-        int avgColumnSetPerRow = this.avgColumnSetPerRow < 0
-                               ? that.avgColumnSetPerRow
-                               : (that.avgColumnSetPerRow < 0 ? this.avgColumnSetPerRow : (this.avgColumnSetPerRow + that.avgColumnSetPerRow) / 2);
-
-        return new EncodingStats(minTimestamp, minDelTime, minTTL, avgColumnSetPerRow);
+        return new EncodingStats(minTimestamp, minDelTime, minTTL);
     }
 
     @Override
@@ -125,8 +115,7 @@ public class EncodingStats
 
         EncodingStats that = (EncodingStats) o;
 
-        return this.avgColumnSetPerRow == that.avgColumnSetPerRow
-            && this.minLocalDeletionTime == that.minLocalDeletionTime
+        return this.minLocalDeletionTime == that.minLocalDeletionTime
             && this.minTTL == that.minTTL
             && this.minTimestamp == that.minTimestamp;
     }
@@ -134,13 +123,13 @@ public class EncodingStats
     @Override
     public int hashCode()
     {
-        return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+        return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL);
     }
 
     @Override
     public String toString()
     {
-        return String.format("EncodingStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+        return String.format("EncodingStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL);
     }
 
     public static class Collector implements PartitionStatisticsCollector
@@ -237,8 +226,7 @@ public class EncodingStats
         {
             return new EncodingStats(isTimestampSet ? minTimestamp : TIMESTAMP_EPOCH,
                                      isDelTimeSet ? minDeletionTime : DELETION_TIME_EPOCH,
-                                     isTTLSet ? minTTL : TTL_EPOCH,
-                                     isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
+                                     isTTLSet ? minTTL : TTL_EPOCH);
         }
 
         public static EncodingStats collect(Row staticRow, Iterator<Row> rows, DeletionInfo deletionInfo)
@@ -260,15 +248,13 @@ public class EncodingStats
             out.writeVInt(stats.minTimestamp - TIMESTAMP_EPOCH);
             out.writeVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH);
             out.writeVInt(stats.minTTL - TTL_EPOCH);
-            out.writeVInt(stats.avgColumnSetPerRow);
         }
 
         public int serializedSize(EncodingStats stats)
         {
             return TypeSizes.sizeofVInt(stats.minTimestamp - TIMESTAMP_EPOCH)
-                 + TypeSizes.sizeofVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH)
-                 + TypeSizes.sizeofVInt(stats.minTTL - TTL_EPOCH)
-                 + TypeSizes.sizeofVInt(stats.avgColumnSetPerRow);
+                   + TypeSizes.sizeofVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH)
+                   + TypeSizes.sizeofVInt(stats.minTTL - TTL_EPOCH);
         }
 
         public EncodingStats deserialize(DataInputPlus in) throws IOException
@@ -276,8 +262,7 @@ public class EncodingStats
             long minTimestamp = in.readVInt() + TIMESTAMP_EPOCH;
             int minLocalDeletionTime = (int)in.readVInt() + DELETION_TIME_EPOCH;
             int minTTL = (int)in.readVInt() + TTL_EPOCH;
-            int avgColumnSetPerRow = (int)in.readVInt();
-            return new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+            return new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 996e89a..003dd04 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
  * it's own data. For instance, a {@code Row} cannot contains a cell that is deleted by its own
  * row deletion.
  */
-public interface Row extends Unfiltered, Iterable<ColumnData>
+public interface Row extends Unfiltered, Collection<ColumnData>
 {
     /**
      * The clustering values for this row.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 60f0dcb..48e00f9 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -98,6 +98,8 @@ public abstract class UnfilteredRowIterators
      */
     public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder)
     {
+        PartitionColumns columns = staticRow == null ? PartitionColumns.NONE
+                                                     : new PartitionColumns(staticRow.columns(), Columns.NONE);
         return new UnfilteredRowIterator()
         {
             public CFMetaData metadata()
@@ -112,7 +114,7 @@ public abstract class UnfilteredRowIterators
 
             public PartitionColumns columns()
             {
-                return PartitionColumns.NONE;
+                return columns;
             }
 
             public DecoratedKey partitionKey()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 14b06cf..0866810 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -19,11 +19,12 @@ package org.apache.cassandra.db.rows;
 
 import java.io.IOException;
 
+import com.google.common.collect.Collections2;
+
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.SearchIterator;
 
 /**
  * Serialize/deserialize a single Unfiltered (both on-wire and on-disk).
@@ -37,29 +38,20 @@ import org.apache.cassandra.utils.SearchIterator;
  *       {@code Clustering.serializer}. Note that static row are an exception and
  *       don't have this. <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion
  *       whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the
- *       complex ones.  There is actually 2 slightly different possible layout for those
- *       cell: a dense one and a sparse one. Which one is used depends on the serialization
- *       header and more precisely of {@link SerializationHeader#useSparseColumnLayout(boolean)}:
- *         1) in the dense layout, there will be as many <sci> and <ccj> as there is columns
- *            in the serialization header. *Each simple column <sci> will simply be a <cell>
- *            (which might have no value, see below), while each <ccj> will be
- *             [<delTime>]<cell1>...<celln><emptyCell> where <delTime> is the deletion for
- *             this complex column (if flags indicates it present), <celln> are the <cell>
- *             for this complex column and <emptyCell> is a last cell that will have no value
- *             to indicate the end of this column.
- *         2) in the sparse layout, there won't be "empty" cells, i.e. only the column that
- *            actually have a cell are represented. For that, each <sci> and <ccj> start
- *            by a 2 byte index that points to the column in the header it belongs to. After
- *            that, each <sci> and <ccj> is the same than for the dense layout. But contrarily
- *            to the dense layout we won't know how many elements are serialized so a 2 byte
- *            marker with a value of -1 will indicates the end of the row.
+ *       complex ones.
+ *       The columns for the row are then serialized if they differ from those in the header,
+ *       and each cell then follows:
+ *         * Each simple column <sci> will simply be a <cell>
+ *           (which might have no value, see below),
+ *         * Each <ccj> will be [<delTime>]<n><cell1>...<celln> where <delTime>
+ *           is the deletion for this complex column (if flags indicates it present), <n>
+ *           is the vint encoded value of n, i.e. <celln>'s 1-based index, <celli>
+ *           are the <cell> for this complex column
  *   <marker> is <bound><deletion> where <bound> is the marker bound as serialized
  *       by {@code Slice.Bound.serializer} and <deletion> is the marker deletion
  *       time.
  *
- *   <cell> A cell start with a 1 byte <flag>. Thre rightmost 1st bit indicates
- *       if there is actually a value for this cell. If this flag is unset,
- *       nothing more follows for the cell. The 2nd and third flag indicates if
+ *   <cell> A cell start with a 1 byte <flag>. The 2nd and third flag bits indicate if
  *       it's a deleted or expiring cell. The 4th flag indicates if the value
  *       is empty or not. The 5th and 6th indicates if the timestamp and ttl/
  *       localDeletionTime for the cell are the same than the row one (if that
@@ -85,6 +77,7 @@ public class UnfilteredSerializer
     private final static int HAS_TTL              = 0x10; // Whether the encoded row has some expiration info (i.e. if row.partitionKeyLivenessInfo().hasTTL() == true).
     private final static int HAS_DELETION         = 0x20; // Whether the encoded row has some deletion info.
     private final static int HAS_COMPLEX_DELETION = 0x40; // Whether the encoded row has some complex deletion for at least one of its columns.
+    private final static int HAS_ALL_COLUMNS      = 0x80; // Whether the encoded row has all of the columns from the header present
 
     public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version)
     throws IOException
@@ -105,9 +98,11 @@ public class UnfilteredSerializer
         int flags = 0;
         boolean isStatic = row.isStatic();
 
+        Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
         DeletionTime deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
+        boolean hasAllColumns = (row.size() == headerColumns.size());
 
         if (isStatic)
             flags |= IS_STATIC;
@@ -119,6 +114,8 @@ public class UnfilteredSerializer
             flags |= HAS_DELETION;
         if (hasComplexDeletion)
             flags |= HAS_COMPLEX_DELETION;
+        if (hasAllColumns)
+            flags |= HAS_ALL_COLUMNS;
 
         out.writeByte((byte)flags);
         if (!isStatic)
@@ -134,55 +131,27 @@ public class UnfilteredSerializer
         if ((flags & HAS_DELETION) != 0)
             header.writeDeletionTime(deletion, out);
 
-        Columns columns = header.columns(isStatic);
-        int simpleCount = columns.simpleColumnCount();
-        boolean useSparse = header.useSparseColumnLayout(isStatic);
-        SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
-
-        for (int i = 0; i < simpleCount; i++)
-            writeSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), pkLiveness, header, out, useSparse);
+        if (!hasAllColumns)
+            Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out);
 
-        for (int i = simpleCount; i < columns.columnCount(); i++)
-            writeComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, out, useSparse);
-
-        if (useSparse)
-            out.writeVInt(-1);
-    }
-
-    private void writeSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse)
-    throws IOException
-    {
-        if (useSparse)
+        for (ColumnData data : row)
         {
-            if (cell == null)
-                return;
-
-            out.writeVInt(idx);
+            if (data.column.isSimple())
+                Cell.serializer.serialize((Cell) data, out, pkLiveness, header);
+            else
+                writeComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header, out);
         }
-        Cell.serializer.serialize(cell, out, rowLiveness, header);
     }
 
-    private void writeComplexColumn(int idx, ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse)
+    private void writeComplexColumn(ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out)
     throws IOException
     {
-        if (useSparse)
-        {
-            if (data == null)
-                return;
-
-            out.writeVInt(idx);
-        }
-
         if (hasComplexDeletion)
             header.writeDeletionTime(data == null ? DeletionTime.LIVE : data.complexDeletion(), out);
 
-        if (data != null)
-        {
-            for (Cell cell : data)
-                Cell.serializer.serialize(cell, out, rowLiveness, header);
-        }
-
-        Cell.serializer.serialize(null, out, rowLiveness, header);
+        out.writeUnsignedVInt(data.cellsCount());
+        for (Cell cell : data)
+            Cell.serializer.serialize(cell, out, rowLiveness, header);
     }
 
     public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version)
@@ -215,9 +184,11 @@ public class UnfilteredSerializer
         long size = 1; // flags
 
         boolean isStatic = row.isStatic();
+        Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
         DeletionTime deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
+        boolean hasAllColumns = (row.size() == headerColumns.size());
 
         if (!isStatic)
             size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
@@ -232,57 +203,32 @@ public class UnfilteredSerializer
         if (!deletion.isLive())
             size += header.deletionTimeSerializedSize(deletion);
 
-        Columns columns = header.columns(isStatic);
-        int simpleCount = columns.simpleColumnCount();
-        boolean useSparse = header.useSparseColumnLayout(isStatic);
-        SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
-
-        for (int i = 0; i < simpleCount; i++)
-            size += sizeOfSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), pkLiveness, header, useSparse);
-
-        for (int i = simpleCount; i < columns.columnCount(); i++)
-            size += sizeOfComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, useSparse);
-
-        if (useSparse)
-            size += TypeSizes.sizeofVInt(-1);
+        if (!hasAllColumns)
+            size += Columns.serializer.serializedSubsetSize(Collections2.transform(row, ColumnData::column), header.columns(isStatic));
 
-        return size;
-    }
-
-    private long sizeOfSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, boolean useSparse)
-    {
-        long size = 0;
-        if (useSparse)
+        for (ColumnData data : row)
         {
-            if (cell == null)
-                return size;
-
-            size += TypeSizes.sizeofVInt(idx);
+            if (data.column.isSimple())
+                size += Cell.serializer.serializedSize((Cell) data, pkLiveness, header);
+            else
+                size += sizeOfComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header);
         }
-        return size + Cell.serializer.serializedSize(cell, rowLiveness, header);
+
+        return size;
     }
 
-    private long sizeOfComplexColumn(int idx, ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, boolean useSparse)
+    private long sizeOfComplexColumn(ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header)
     {
         long size = 0;
-        if (useSparse)
-        {
-            if (data == null)
-                return size;
-
-            size += TypeSizes.sizeofVInt(idx);
-        }
 
         if (hasComplexDeletion)
             size += header.deletionTimeSerializedSize(data == null ? DeletionTime.LIVE : data.complexDeletion());
 
-        if (data != null)
-        {
-            for (Cell cell : data)
-                size += Cell.serializer.serializedSize(cell, rowLiveness, header);
-        }
+        size += TypeSizes.sizeofUnsignedVInt(data.cellsCount());
+        for (Cell cell : data)
+            size += Cell.serializer.serializedSize(cell, rowLiveness, header);
 
-        return size + Cell.serializer.serializedSize(null, rowLiveness, header);
+        return size;
     }
 
     public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version)
@@ -369,6 +315,8 @@ public class UnfilteredSerializer
             boolean hasTTL = (flags & HAS_TTL) != 0;
             boolean hasDeletion = (flags & HAS_DELETION) != 0;
             boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+            boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
+            Columns headerColumns = header.columns(isStatic);
 
             LivenessInfo rowLiveness = LivenessInfo.EMPTY;
             if (hasTimestamp)
@@ -382,33 +330,16 @@ public class UnfilteredSerializer
             builder.addPrimaryKeyLivenessInfo(rowLiveness);
             builder.addRowDeletion(hasDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE);
 
-            Columns columns = header.columns(isStatic);
-            if (header.useSparseColumnLayout(isStatic))
-            {
-                int count = columns.columnCount();
-                int simpleCount = columns.simpleColumnCount();
-                int i;
-                while ((i = (int)in.readVInt()) >= 0)
-                {
-                    if (i > count)
-                        throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
-
-                    if (i < simpleCount)
-                        readSimpleColumn(columns.getSimple(i), in, header, helper, builder, rowLiveness);
-                    else
-                        readComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion, builder, rowLiveness);
-                }
-            }
-            else
+            Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
+            for (ColumnDefinition column : columns)
             {
-                for (int i = 0; i < columns.simpleColumnCount(); i++)
-                    readSimpleColumn(columns.getSimple(i), in, header, helper, builder, rowLiveness);
-
-                for (int i = 0; i < columns.complexColumnCount(); i++)
-                    readComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion, builder, rowLiveness);
+                if (column.isSimple())
+                    readSimpleColumn(column, in, header, helper, builder, rowLiveness);
+                else
+                    readComplexColumn(column, in, header, helper, hasComplexDeletion, builder, rowLiveness);
             }
 
-                return builder.build();
+            return builder.build();
         }
         catch (RuntimeException | AssertionError e)
         {
@@ -426,7 +357,7 @@ public class UnfilteredSerializer
         if (helper.includes(column))
         {
             Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper);
-            if (cell != null && !helper.isDropped(cell, false))
+            if (!helper.isDropped(cell, false))
                 builder.addCell(cell);
         }
         else
@@ -448,9 +379,10 @@ public class UnfilteredSerializer
                     builder.addComplexDeletion(column, complexDeletion);
             }
 
-            Cell cell;
-            while ((cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper)) != null)
+            int count = (int) in.readUnsignedVInt();
+            while (--count >= 0)
             {
+                Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper);
                 if (helper.includes(cell.path()) && !helper.isDropped(cell, true))
                     builder.addCell(cell);
             }
@@ -470,6 +402,8 @@ public class UnfilteredSerializer
         boolean hasTTL = (flags & HAS_TTL) != 0;
         boolean hasDeletion = (flags & HAS_DELETION) != 0;
         boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+        boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
+        Columns headerColumns = header.columns(isStatic);
 
         // Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have
         // the size we think due to VINT encoding
@@ -483,30 +417,13 @@ public class UnfilteredSerializer
         if (hasDeletion)
             header.skipDeletionTime(in);
 
-        Columns columns = header.columns(isStatic);
-        if (header.useSparseColumnLayout(isStatic))
-        {
-            int count = columns.columnCount();
-            int simpleCount = columns.simpleColumnCount();
-            int i;
-            while ((i = (int)in.readVInt()) >= 0)
-            {
-                if (i > count)
-                    throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
-
-                if (i < simpleCount)
-                    Cell.serializer.skip(in, columns.getSimple(i), header);
-                else
-                    skipComplexColumn(in, columns.getComplex(i - simpleCount), header, hasComplexDeletion);
-            }
-        }
-        else
+        Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
+        for (ColumnDefinition column : columns)
         {
-            for (int i = 0; i < columns.simpleColumnCount(); i++)
-                Cell.serializer.skip(in, columns.getSimple(i), header);
-
-            for (int i = 0; i < columns.complexColumnCount(); i++)
-                skipComplexColumn(in, columns.getComplex(i), header, hasComplexDeletion);
+            if (column.isSimple())
+                Cell.serializer.skip(in, column, header);
+            else
+                skipComplexColumn(in, column, header, hasComplexDeletion);
         }
     }
 
@@ -536,7 +453,9 @@ public class UnfilteredSerializer
         if (hasComplexDeletion)
             header.skipDeletionTime(in);
 
-        while (Cell.serializer.skip(in, column, header));
+        int count = (int) in.readUnsignedVInt();
+        while (--count >= 0)
+            Cell.serializer.skip(in, column, header);
     }
 
     public static boolean isEndOfPartition(int flags)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 793cd81..acdf6bb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -127,7 +127,7 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implem
     {
         // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
         // SerializationHeader.make() for details) so we use the latter instead.
-        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
     }
 
     public int compareTo(SSTableIdentityIterator o)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/ColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java b/test/unit/org/apache/cassandra/db/ColumnsTest.java
index 1a91c3d..c080948 100644
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@ -83,9 +83,9 @@ public class ColumnsTest
 
     private void assertSubset(Columns superset, Columns subset)
     {
-        Assert.assertTrue(superset.contains(superset));
-        Assert.assertTrue(superset.contains(subset));
-        Assert.assertFalse(subset.contains(superset));
+        Assert.assertTrue(superset.containsAll(superset));
+        Assert.assertTrue(superset.containsAll(subset));
+        Assert.assertFalse(subset.containsAll(superset));
     }
 
     @Test
@@ -275,7 +275,7 @@ public class ColumnsTest
             Columns subset = columns;
             for (ColumnDefinition def : remove)
                 subset = subset.without(def);
-            Assert.assertEquals(columns.columnCount() - remove.size(), subset.columnCount());
+            Assert.assertEquals(columns.size() - remove.size(), subset.size());
             List<ColumnDefinition> remainingDefs = Lists.newArrayList(columns);
             remainingDefs.removeAll(remove);
             return new ColumnsCheck(subset, remainingDefs);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index f651093..f0a63a8 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -105,7 +105,7 @@ public class PartitionTest
 
         CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new DataInputBuffer(bufOut.getData()));
 
-        assertEquals(partition.columns().regulars.columnCount(), deserialized.columns().regulars.columnCount());
+        assertEquals(partition.columns().regulars.size(), deserialized.columns().regulars.size());
         assertTrue(deserialized.columns().regulars.getSimple(1).equals(partition.columns().regulars.getSimple(1)));
         assertTrue(deserialized.columns().regulars.getSimple(5).equals(partition.columns().regulars.getSimple(5)));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 3aaf21f..cd80a2f 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -141,7 +141,7 @@ public class RowTest
         row = (Row) unfiltered;
         assertEquals("a2", defA.cellValueType().getString(row.getCell(defA).value()));
         assertEquals("b1", defB.cellValueType().getString(row.getCell(defB).value()));
-        assertEquals(2, row.columns().columnCount());
+        assertEquals(2, row.columns().size());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 5887fd4..e9bf4c5 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -325,8 +326,9 @@ public class ScrubTest
                                                                    keys.size(),
                                                                    0L,
                                                                    0,
-                                                                   SerializationHeader.make(cfs.metadata,
-                                                                                            Collections.emptyList())))
+                                                                   new SerializationHeader(cfs.metadata,
+                                                                                           cfs.metadata.partitionColumns(),
+                                                                                           EncodingStats.NO_STATS)))
             {
 
                 for (String k : keys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 89c0d61..7826317 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -179,9 +179,17 @@ public class SSTableUtils
 
         public Collection<SSTableReader> write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException
         {
+            PartitionColumns.Builder builder = PartitionColumns.builder();
+            for (PartitionUpdate update : sorted.values())
+                builder.addAll(update.columns());
             final Iterator<Map.Entry<DecoratedKey, PartitionUpdate>> iter = sorted.entrySet().iterator();
             return write(sorted.size(), new Appender()
             {
+                public SerializationHeader header()
+                {
+                    return new SerializationHeader(Schema.instance.getCFMetaData(ksname, cfname), builder.build(), EncodingStats.NO_STATS);
+                }
+
                 @Override
                 public boolean append(SSTableTxnWriter writer) throws IOException
                 {
@@ -207,7 +215,7 @@ public class SSTableUtils
             File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA));
             CFMetaData cfm = Schema.instance.getCFMetaData(ksname, cfname);
             ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.cfId);
-            SerializationHeader header = SerializationHeader.make(cfm, Collections.EMPTY_LIST);
+            SerializationHeader header = appender.header();
             SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
             while (appender.append(writer)) { /* pass */ }
             Collection<SSTableReader> readers = writer.finish(true);
@@ -223,6 +231,7 @@ public class SSTableUtils
 
     public static abstract class Appender
     {
+        public abstract SerializationHeader header();
         /** Called with an open writer until it returns false. */
         public abstract boolean append(SSTableTxnWriter writer) throws IOException;
     }


[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1ad7f0c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1ad7f0c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1ad7f0c

Branch: refs/heads/trunk
Commit: b1ad7f0c7fb3b6799dc7babdf65fcbdcd2331d6e
Parents: f744b6c 0d74c3e
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Aug 28 09:03:41 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 28 09:03:41 2015 +0100

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/Columns.java   |  46 ++--
 .../org/apache/cassandra/db/LegacyLayout.java   |   2 +-
 .../apache/cassandra/db/PartitionColumns.java   |   4 +-
 .../cassandra/db/SerializationHeader.java       |  18 --
 .../columniterator/AbstractSSTableIterator.java |   2 +-
 .../columniterator/SSTableReversedIterator.java |   2 +-
 .../apache/cassandra/db/filter/DataLimits.java  |   4 +-
 .../db/partitions/PartitionUpdate.java          |   4 +-
 .../apache/cassandra/db/rows/AbstractRow.java   |   3 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  12 +
 .../apache/cassandra/db/rows/BufferCell.java    |  31 +--
 .../apache/cassandra/db/rows/EncodingStats.java |  37 +---
 src/java/org/apache/cassandra/db/rows/Row.java  |   2 +-
 .../db/rows/UnfilteredRowIterators.java         |   4 +-
 .../cassandra/db/rows/UnfilteredSerializer.java | 217 ++++++-------------
 .../io/sstable/SSTableIdentityIterator.java     |   2 +-
 .../org/apache/cassandra/db/ColumnsTest.java    |   8 +-
 .../org/apache/cassandra/db/PartitionTest.java  |   2 +-
 test/unit/org/apache/cassandra/db/RowTest.java  |   2 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   6 +-
 .../cassandra/io/sstable/SSTableUtils.java      |  11 +-
 21 files changed, 158 insertions(+), 261 deletions(-)
----------------------------------------------------------------------



[2/3] cassandra git commit: Serialize sstable row columns using subset encoding

Posted by be...@apache.org.
Serialize sstable row columns using subset encoding

Instead of making an sstable-wide sparse/dense coding
decision, this patch encodes all rows using the Columns
subset encoding, that results in a small bitmap for tables
with fewer than 64 columns, and delta encoding when larger

patch by benedict; reviewed by sylvain for CASSANDRA-10045


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d74c3eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d74c3eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d74c3eb

Branch: refs/heads/trunk
Commit: 0d74c3ebf76f4f7875f2a2e5dd25a7a1c1edfc0a
Parents: 16b9f8b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Aug 17 18:41:37 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 28 09:00:52 2015 +0100

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/Columns.java   |  46 ++--
 .../org/apache/cassandra/db/LegacyLayout.java   |   2 +-
 .../apache/cassandra/db/PartitionColumns.java   |   4 +-
 .../cassandra/db/SerializationHeader.java       |  18 --
 .../columniterator/AbstractSSTableIterator.java |   2 +-
 .../columniterator/SSTableReversedIterator.java |   2 +-
 .../apache/cassandra/db/filter/DataLimits.java  |   4 +-
 .../db/partitions/PartitionUpdate.java          |   4 +-
 .../apache/cassandra/db/rows/AbstractRow.java   |   3 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  12 +
 .../apache/cassandra/db/rows/BufferCell.java    |  31 +--
 .../apache/cassandra/db/rows/EncodingStats.java |  37 +---
 src/java/org/apache/cassandra/db/rows/Row.java  |   2 +-
 .../db/rows/UnfilteredRowIterators.java         |   4 +-
 .../cassandra/db/rows/UnfilteredSerializer.java | 217 ++++++-------------
 .../io/sstable/SSTableIdentityIterator.java     |   2 +-
 .../org/apache/cassandra/db/ColumnsTest.java    |   8 +-
 .../org/apache/cassandra/db/PartitionTest.java  |   2 +-
 test/unit/org/apache/cassandra/db/RowTest.java  |   2 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   6 +-
 .../cassandra/io/sstable/SSTableUtils.java      |  11 +-
 21 files changed, 158 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index ddb9930..46e8401 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -46,7 +46,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
  * Note that in practice, it will either store only static columns, or only regular ones. When
  * we need both type of columns, we use a {@link PartitionColumns} object.
  */
-public class Columns implements Iterable<ColumnDefinition>
+public class Columns extends AbstractCollection<ColumnDefinition> implements Collection<ColumnDefinition>
 {
     public static final Serializer serializer = new Serializer();
     public static final Columns NONE = new Columns(BTree.empty(), 0);
@@ -136,7 +136,7 @@ public class Columns implements Iterable<ColumnDefinition>
      *
      * @return the total number of columns in this object.
      */
-    public int columnCount()
+    public int size()
     {
         return BTree.size(columns);
     }
@@ -261,14 +261,16 @@ public class Columns implements Iterable<ColumnDefinition>
      *
      * @return whether all the columns of {@code other} are contained by this object.
      */
-    public boolean contains(Columns other)
+    public boolean containsAll(Collection<?> other)
     {
-        if (other.columnCount() > columnCount())
+        if (other == this)
+            return true;
+        if (other.size() > this.size())
             return false;
 
         BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = BTree.slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC);
-        for (ColumnDefinition def : BTree.<ColumnDefinition>iterable(other.columns))
-            if (iter.next(def) == null)
+        for (Object def : other)
+            if (iter.next((ColumnDefinition) def) == null)
                 return false;
         return true;
     }
@@ -379,28 +381,28 @@ public class Columns implements Iterable<ColumnDefinition>
     @Override
     public String toString()
     {
-        StringBuilder sb = new StringBuilder();
+        StringBuilder sb = new StringBuilder("[");
         boolean first = true;
         for (ColumnDefinition def : this)
         {
             if (first) first = false; else sb.append(" ");
             sb.append(def.name);
         }
-        return sb.toString();
+        return sb.append("]").toString();
     }
 
     public static class Serializer
     {
         public void serialize(Columns columns, DataOutputPlus out) throws IOException
         {
-            out.writeVInt(columns.columnCount());
+            out.writeVInt(columns.size());
             for (ColumnDefinition column : columns)
                 ByteBufferUtil.writeWithVIntLength(column.name.bytes, out);
         }
 
         public long serializedSize(Columns columns)
         {
-            long size = TypeSizes.sizeofVInt(columns.columnCount());
+            long size = TypeSizes.sizeofVInt(columns.size());
             for (ColumnDefinition column : columns)
                 size += ByteBufferUtil.serializedSizeWithVIntLength(column.name.bytes);
             return size;
@@ -433,7 +435,7 @@ public class Columns implements Iterable<ColumnDefinition>
          * If both ends have a pre-shared superset of the columns we are serializing, we can send them much
          * more efficiently. Both ends must provide the identically same set of columns.
          */
-        public void serializeSubset(Columns columns, Columns superset, DataOutputPlus out) throws IOException
+        public void serializeSubset(Collection<ColumnDefinition> columns, Columns superset, DataOutputPlus out) throws IOException
         {
             /**
              * We weight this towards small sets, and sets where the majority of items are present, since
@@ -447,8 +449,8 @@ public class Columns implements Iterable<ColumnDefinition>
              * to a vint encoded set of deltas, either adding or subtracting (whichever is most efficient).
              * We indicate this switch by sending our bitmap with every bit set, i.e. -1L
              */
-            int columnCount = columns.columnCount();
-            int supersetCount = superset.columnCount();
+            int columnCount = columns.size();
+            int supersetCount = superset.size();
             if (columnCount == supersetCount)
             {
                 out.writeUnsignedVInt(0);
@@ -463,10 +465,10 @@ public class Columns implements Iterable<ColumnDefinition>
             }
         }
 
-        public long serializedSubsetSize(Columns columns, Columns superset)
+        public long serializedSubsetSize(Collection<ColumnDefinition> columns, Columns superset)
         {
-            int columnCount = columns.columnCount();
-            int supersetCount = superset.columnCount();
+            int columnCount = columns.size();
+            int supersetCount = superset.size();
             if (columnCount == supersetCount)
             {
                 return TypeSizes.sizeofUnsignedVInt(0);
@@ -488,7 +490,7 @@ public class Columns implements Iterable<ColumnDefinition>
             {
                 return superset;
             }
-            else if (superset.columnCount() >= 64)
+            else if (superset.size() >= 64)
             {
                 return deserializeLargeSubset(in, superset, (int) encoded);
             }
@@ -512,7 +514,7 @@ public class Columns implements Iterable<ColumnDefinition>
 
         // encodes a 1 bit for every *missing* column, on the assumption presence is more common,
         // and because this is consistent with encoding 0 to represent all present
-        private static long encodeBitmap(Columns columns, Columns superset, int supersetCount)
+        private static long encodeBitmap(Collection<ColumnDefinition> columns, Columns superset, int supersetCount)
         {
             long bitmap = 0L;
             BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = superset.iterator();
@@ -521,7 +523,7 @@ public class Columns implements Iterable<ColumnDefinition>
             for (ColumnDefinition column : columns)
             {
                 if (iter.next(column) == null)
-                    throw new IllegalStateException();
+                    throw new IllegalStateException(columns + " is not a subset of " + superset);
 
                 int currentIndex = iter.indexOfCurrent();
                 int count = currentIndex - expectIndex;
@@ -537,7 +539,7 @@ public class Columns implements Iterable<ColumnDefinition>
         }
 
         @DontInline
-        private void serializeLargeSubset(Columns columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException
+        private void serializeLargeSubset(Collection<ColumnDefinition> columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException
         {
             // write flag indicating we're in lengthy mode
             out.writeUnsignedVInt(supersetCount - columnCount);
@@ -572,7 +574,7 @@ public class Columns implements Iterable<ColumnDefinition>
         @DontInline
         private Columns deserializeLargeSubset(DataInputPlus in, Columns superset, int delta) throws IOException
         {
-            int supersetCount = superset.columnCount();
+            int supersetCount = superset.size();
             int columnCount = supersetCount - delta;
 
             BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
@@ -609,7 +611,7 @@ public class Columns implements Iterable<ColumnDefinition>
         }
 
         @DontInline
-        private int serializeLargeSubsetSize(Columns columns, int columnCount, Columns superset, int supersetCount)
+        private int serializeLargeSubsetSize(Collection<ColumnDefinition> columns, int columnCount, Columns superset, int supersetCount)
         {
             // write flag indicating we're in lengthy mode
             int size = TypeSizes.sizeofUnsignedVInt(supersetCount - columnCount);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 1c72d31..628ac75 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -534,7 +534,7 @@ public abstract class LegacyLayout
             // TODO: there is in practice nothing to do here, but we need to handle the column_metadata for super columns somewhere else
             throw new UnsupportedOperationException();
 
-        Set<ByteBuffer> columnsToFetch = new HashSet<>(statics.columnCount());
+        Set<ByteBuffer> columnsToFetch = new HashSet<>(statics.size());
         for (ColumnDefinition column : statics)
             columnsToFetch.add(column.name.bytes);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java
index aa60198..e1008df 100644
--- a/src/java/org/apache/cassandra/db/PartitionColumns.java
+++ b/src/java/org/apache/cassandra/db/PartitionColumns.java
@@ -78,7 +78,7 @@ public class PartitionColumns implements Iterable<ColumnDefinition>
 
     public boolean includes(PartitionColumns columns)
     {
-        return statics.contains(columns.statics) && regulars.contains(columns.regulars);
+        return statics.containsAll(columns.statics) && regulars.containsAll(columns.regulars);
     }
 
     public Iterator<ColumnDefinition> iterator()
@@ -94,7 +94,7 @@ public class PartitionColumns implements Iterable<ColumnDefinition>
     /** * Returns the total number of static and regular columns. */
     public int size()
     {
-        return regulars.columnCount() + statics.columnCount();
+        return regulars.size() + statics.size();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 88f6832..8d4e604 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -54,9 +54,6 @@ public class SerializationHeader
 
     private final Map<ByteBuffer, AbstractType<?>> typeMap;
 
-    // Whether or not to store cell in a sparse or dense way. See UnfilteredSerializer for details.
-    private final boolean useSparseColumnLayout;
-
     private SerializationHeader(AbstractType<?> keyType,
                                 List<AbstractType<?>> clusteringTypes,
                                 PartitionColumns columns,
@@ -68,21 +65,6 @@ public class SerializationHeader
         this.columns = columns;
         this.stats = stats;
         this.typeMap = typeMap;
-
-        // For the dense layout, we have a 1 byte overhead for absent columns. For the sparse layout, it's a 1
-        // overhead for present columns (in fact we use a 2 byte id, but assuming vint encoding, we'll pay 2 bytes
-        // only for the columns after the 128th one and for simplicity we assume that once you have that many column,
-        // you'll tend towards a clearly dense or clearly sparse case so that the heurstic above shouldn't still be
-        // too inapropriate). So if on average more than half of our columns are set per row, we better go for sparse.
-        this.useSparseColumnLayout = stats.avgColumnSetPerRow <= (columns.regulars.columnCount()/ 2);
-    }
-
-    public boolean useSparseColumnLayout(boolean isStatic)
-    {
-        // We always use a dense layout for the static row. Having very many static columns with  only a few set at
-        // any given time doesn't feel very common at all (and we already optimize the case where no static at all
-        // are provided).
-        return !isStatic && useSparseColumnLayout;
     }
 
     public static SerializationHeader forKeyCache(CFMetaData metadata)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 5e6165f..cf4bff7 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -214,7 +214,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
     {
         // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
         // SerializationHeader.make() for details) so we use the latter instead.
-        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
     }
 
     public boolean hasNext()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 4d2e294..a5a1938 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -74,7 +74,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
         protected ReusablePartitionData createBuffer(int blocksCount)
         {
             int estimatedRowCount = 16;
-            int columnCount = metadata().partitionColumns().regulars.columnCount();
+            int columnCount = metadata().partitionColumns().regulars.size();
             if (columnCount == 0 || metadata().clusteringColumns().isEmpty())
             {
                 estimatedRowCount = 1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 3e608b4..0d6f816 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -270,7 +270,7 @@ public abstract class DataLimits
         {
             // TODO: we should start storing stats on the number of rows (instead of the number of cells, which
             // is what getMeanColumns returns)
-            float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.columnCount();
+            float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
             return rowsPerPartition * (cfs.estimateKeys());
         }
 
@@ -506,7 +506,7 @@ public abstract class DataLimits
         public float estimateTotalResults(ColumnFamilyStore cfs)
         {
             // remember that getMeansColumns returns a number of cells: we should clean nomenclature
-            float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.columnCount();
+            float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
             return cellsPerPartition * cfs.estimateKeys();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 3d2c94b..5e056d2 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -529,7 +529,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         if (row.isStatic())
         {
             // We test for == first because in most case it'll be true and that is faster
-            assert columns().statics == row.columns() || columns().statics.contains(row.columns());
+            assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns();
             Row staticRow = holder.staticRow.isEmpty()
                       ? row
                       : Rows.merge(holder.staticRow, row, createdAtInSec);
@@ -538,7 +538,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         else
         {
             // We test for == first because in most case it'll be true and that is faster
-            assert columns().regulars == row.columns() || columns().regulars.contains(row.columns());
+            assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns();
             rowBuilder.add(row);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 6090274..fca765f 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -18,6 +18,7 @@ package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.AbstractCollection;
 import java.util.Objects;
 
 import com.google.common.collect.Iterables;
@@ -34,7 +35,7 @@ import org.apache.cassandra.utils.FBUtilities;
  * Unless you have a very good reason not to, every row implementation
  * should probably extend this class.
  */
-public abstract class AbstractRow implements Row
+public abstract class AbstractRow extends AbstractCollection<ColumnData> implements Row
 {
     public Unfiltered.Kind kind()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 7e50716..ed036af 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -22,6 +22,7 @@ import java.util.*;
 import java.util.function.Predicate;
 
 import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -35,6 +36,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 
 /**
@@ -164,6 +166,11 @@ public class BTreeRow extends AbstractRow
         return columns;
     }
 
+    public Collection<ColumnDefinition> actualColumns()
+    {
+        return Collections2.transform(this, ColumnData::column);
+    }
+
     public LivenessInfo primaryKeyLivenessInfo()
     {
         return primaryKeyLivenessInfo;
@@ -207,6 +214,11 @@ public class BTreeRow extends AbstractRow
         return searchIterator();
     }
 
+    public int size()
+    {
+        return BTree.size(btree);
+    }
+
     public Iterable<Cell> cells()
     {
         return CellIterator::new;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index f9a3026..4176ba6 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -218,28 +218,21 @@ public class BufferCell extends AbstractCell
      */
     static class Serializer implements Cell.Serializer
     {
-        private final static int PRESENCE_MASK               = 0x01; // Marks the actual presence of a cell. This is used only when serialized on-disk and
-                                                                     // on-wire (i.e. an actual ByteBufferBackedCell instance cannot have this flag set).
-        private final static int IS_DELETED_MASK             = 0x02; // Whether the cell is a tombstone or not.
-        private final static int IS_EXPIRING_MASK            = 0x04; // Whether the cell is expiring.
-        private final static int HAS_EMPTY_VALUE_MASK        = 0x08; // Wether the cell has an empty value. This will be the case for tombstone in particular.
-        private final static int USE_ROW_TIMESTAMP_MASK      = 0x10; // Wether the cell has the same timestamp than the row this is a cell of.
-        private final static int USE_ROW_TTL_MASK            = 0x20; // Wether the cell has the same ttl than the row this is a cell of.
+        private final static int IS_DELETED_MASK             = 0x01; // Whether the cell is a tombstone or not.
+        private final static int IS_EXPIRING_MASK            = 0x02; // Whether the cell is expiring.
+        private final static int HAS_EMPTY_VALUE_MASK        = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
+        private final static int USE_ROW_TIMESTAMP_MASK      = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
+        private final static int USE_ROW_TTL_MASK            = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
 
         public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
         {
-            if (cell == null)
-            {
-                out.writeByte((byte)0);
-                return;
-            }
-
+            assert cell != null;
             boolean hasValue = cell.value().hasRemaining();
             boolean isDeleted = cell.isTombstone();
             boolean isExpiring = cell.isExpiring();
             boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
             boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
-            int flags = PRESENCE_MASK;
+            int flags = 0;
             if (!hasValue)
                 flags |= HAS_EMPTY_VALUE_MASK;
 
@@ -273,9 +266,6 @@ public class BufferCell extends AbstractCell
         public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
         {
             int flags = in.readUnsignedByte();
-            if ((flags & PRESENCE_MASK) == 0)
-                return null;
-
             boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
             boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
             boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
@@ -317,10 +307,6 @@ public class BufferCell extends AbstractCell
         public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header)
         {
             long size = 1; // flags
-
-            if (cell == null)
-                return size;
-
             boolean hasValue = cell.value().hasRemaining();
             boolean isDeleted = cell.isTombstone();
             boolean isExpiring = cell.isExpiring();
@@ -348,9 +334,6 @@ public class BufferCell extends AbstractCell
         public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
         {
             int flags = in.readUnsignedByte();
-            if ((flags & PRESENCE_MASK) == 0)
-                return false;
-
             boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
             boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
             boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/EncodingStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index efa40ad..5f1a749 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -30,8 +30,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  * <p>
  * Those stats are used to optimize the on-wire and on-disk storage of rows. More precisely,
  * the {@code minTimestamp}, {@code minLocalDeletionTime} and {@code minTTL} stats are used to
- * delta-encode those information for the sake of vint encoding. And {@code avgColumnSetPerRow}
- * is used to decide if cells should be stored in a sparse or dense way (see {@link UnfilteredSerializer}).
+ * delta-encode those information for the sake of vint encoding.
  * <p>
  * Note that due to their use, those stats can suffer to be somewhat inaccurate (the more incurrate
  * they are, the less effective the storage will be, but provided the stats are not completly wacky,
@@ -63,7 +62,7 @@ public class EncodingStats
     }
 
     // We should use this sparingly obviously
-    public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH, -1);
+    public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH);
 
     public static final Serializer serializer = new Serializer();
 
@@ -71,13 +70,9 @@ public class EncodingStats
     public final int minLocalDeletionTime;
     public final int minTTL;
 
-    // Will be < 0 if the value is unknown
-    public final int avgColumnSetPerRow;
-
     public EncodingStats(long minTimestamp,
                          int minLocalDeletionTime,
-                         int minTTL,
-                         int avgColumnSetPerRow)
+                         int minTTL)
     {
         // Note that the exact value of those don't impact correctness, just the efficiency of the encoding. So when we
         // get a value for timestamp (resp. minLocalDeletionTime) that means 'no object had a timestamp' (resp. 'a local
@@ -87,7 +82,6 @@ public class EncodingStats
         this.minTimestamp = minTimestamp == LivenessInfo.NO_TIMESTAMP ? TIMESTAMP_EPOCH : minTimestamp;
         this.minLocalDeletionTime = minLocalDeletionTime == LivenessInfo.NO_EXPIRATION_TIME ? DELETION_TIME_EPOCH : minLocalDeletionTime;
         this.minTTL = minTTL;
-        this.avgColumnSetPerRow = avgColumnSetPerRow;
     }
 
     /**
@@ -110,11 +104,7 @@ public class EncodingStats
                    ? that.minTTL
                    : (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL));
 
-        int avgColumnSetPerRow = this.avgColumnSetPerRow < 0
-                               ? that.avgColumnSetPerRow
-                               : (that.avgColumnSetPerRow < 0 ? this.avgColumnSetPerRow : (this.avgColumnSetPerRow + that.avgColumnSetPerRow) / 2);
-
-        return new EncodingStats(minTimestamp, minDelTime, minTTL, avgColumnSetPerRow);
+        return new EncodingStats(minTimestamp, minDelTime, minTTL);
     }
 
     @Override
@@ -125,8 +115,7 @@ public class EncodingStats
 
         EncodingStats that = (EncodingStats) o;
 
-        return this.avgColumnSetPerRow == that.avgColumnSetPerRow
-            && this.minLocalDeletionTime == that.minLocalDeletionTime
+        return this.minLocalDeletionTime == that.minLocalDeletionTime
             && this.minTTL == that.minTTL
             && this.minTimestamp == that.minTimestamp;
     }
@@ -134,13 +123,13 @@ public class EncodingStats
     @Override
     public int hashCode()
     {
-        return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+        return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL);
     }
 
     @Override
     public String toString()
     {
-        return String.format("EncodingStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+        return String.format("EncodingStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL);
     }
 
     public static class Collector implements PartitionStatisticsCollector
@@ -237,8 +226,7 @@ public class EncodingStats
         {
             return new EncodingStats(isTimestampSet ? minTimestamp : TIMESTAMP_EPOCH,
                                      isDelTimeSet ? minDeletionTime : DELETION_TIME_EPOCH,
-                                     isTTLSet ? minTTL : TTL_EPOCH,
-                                     isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
+                                     isTTLSet ? minTTL : TTL_EPOCH);
         }
 
         public static EncodingStats collect(Row staticRow, Iterator<Row> rows, DeletionInfo deletionInfo)
@@ -260,15 +248,13 @@ public class EncodingStats
             out.writeVInt(stats.minTimestamp - TIMESTAMP_EPOCH);
             out.writeVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH);
             out.writeVInt(stats.minTTL - TTL_EPOCH);
-            out.writeVInt(stats.avgColumnSetPerRow);
         }
 
         public int serializedSize(EncodingStats stats)
         {
             return TypeSizes.sizeofVInt(stats.minTimestamp - TIMESTAMP_EPOCH)
-                 + TypeSizes.sizeofVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH)
-                 + TypeSizes.sizeofVInt(stats.minTTL - TTL_EPOCH)
-                 + TypeSizes.sizeofVInt(stats.avgColumnSetPerRow);
+                   + TypeSizes.sizeofVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH)
+                   + TypeSizes.sizeofVInt(stats.minTTL - TTL_EPOCH);
         }
 
         public EncodingStats deserialize(DataInputPlus in) throws IOException
@@ -276,8 +262,7 @@ public class EncodingStats
             long minTimestamp = in.readVInt() + TIMESTAMP_EPOCH;
             int minLocalDeletionTime = (int)in.readVInt() + DELETION_TIME_EPOCH;
             int minTTL = (int)in.readVInt() + TTL_EPOCH;
-            int avgColumnSetPerRow = (int)in.readVInt();
-            return new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+            return new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 996e89a..003dd04 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
  * it's own data. For instance, a {@code Row} cannot contains a cell that is deleted by its own
  * row deletion.
  */
-public interface Row extends Unfiltered, Iterable<ColumnData>
+public interface Row extends Unfiltered, Collection<ColumnData>
 {
     /**
      * The clustering values for this row.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 60f0dcb..48e00f9 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -98,6 +98,8 @@ public abstract class UnfilteredRowIterators
      */
     public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder)
     {
+        PartitionColumns columns = staticRow == null ? PartitionColumns.NONE
+                                                     : new PartitionColumns(staticRow.columns(), Columns.NONE);
         return new UnfilteredRowIterator()
         {
             public CFMetaData metadata()
@@ -112,7 +114,7 @@ public abstract class UnfilteredRowIterators
 
             public PartitionColumns columns()
             {
-                return PartitionColumns.NONE;
+                return columns;
             }
 
             public DecoratedKey partitionKey()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 14b06cf..0866810 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -19,11 +19,12 @@ package org.apache.cassandra.db.rows;
 
 import java.io.IOException;
 
+import com.google.common.collect.Collections2;
+
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.SearchIterator;
 
 /**
  * Serialize/deserialize a single Unfiltered (both on-wire and on-disk).
@@ -37,29 +38,20 @@ import org.apache.cassandra.utils.SearchIterator;
  *       {@code Clustering.serializer}. Note that static row are an exception and
  *       don't have this. <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion
  *       whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the
- *       complex ones.  There is actually 2 slightly different possible layout for those
- *       cell: a dense one and a sparse one. Which one is used depends on the serialization
- *       header and more precisely of {@link SerializationHeader#useSparseColumnLayout(boolean)}:
- *         1) in the dense layout, there will be as many <sci> and <ccj> as there is columns
- *            in the serialization header. *Each simple column <sci> will simply be a <cell>
- *            (which might have no value, see below), while each <ccj> will be
- *             [<delTime>]<cell1>...<celln><emptyCell> where <delTime> is the deletion for
- *             this complex column (if flags indicates it present), <celln> are the <cell>
- *             for this complex column and <emptyCell> is a last cell that will have no value
- *             to indicate the end of this column.
- *         2) in the sparse layout, there won't be "empty" cells, i.e. only the column that
- *            actually have a cell are represented. For that, each <sci> and <ccj> start
- *            by a 2 byte index that points to the column in the header it belongs to. After
- *            that, each <sci> and <ccj> is the same than for the dense layout. But contrarily
- *            to the dense layout we won't know how many elements are serialized so a 2 byte
- *            marker with a value of -1 will indicates the end of the row.
+ *       complex ones.
+ *       The columns for the row are then serialized if they differ from those in the header,
+ *       and each cell then follows:
+ *         * Each simple column <sci> will simply be a <cell>
+ *           (which might have no value, see below),
+ *         * Each <ccj> will be [<delTime>]<n><cell1>...<celln> where <delTime>
+ *           is the deletion for this complex column (if flags indicates it present), <n>
+ *           is the vint encoded value of n, i.e. <celln>'s 1-based index, <celli>
+ *           are the <cell> for this complex column
  *   <marker> is <bound><deletion> where <bound> is the marker bound as serialized
  *       by {@code Slice.Bound.serializer} and <deletion> is the marker deletion
  *       time.
  *
- *   <cell> A cell start with a 1 byte <flag>. Thre rightmost 1st bit indicates
- *       if there is actually a value for this cell. If this flag is unset,
- *       nothing more follows for the cell. The 2nd and third flag indicates if
+ *   <cell> A cell start with a 1 byte <flag>. The 2nd and third flag bits indicate if
  *       it's a deleted or expiring cell. The 4th flag indicates if the value
  *       is empty or not. The 5th and 6th indicates if the timestamp and ttl/
  *       localDeletionTime for the cell are the same than the row one (if that
@@ -85,6 +77,7 @@ public class UnfilteredSerializer
     private final static int HAS_TTL              = 0x10; // Whether the encoded row has some expiration info (i.e. if row.partitionKeyLivenessInfo().hasTTL() == true).
     private final static int HAS_DELETION         = 0x20; // Whether the encoded row has some deletion info.
     private final static int HAS_COMPLEX_DELETION = 0x40; // Whether the encoded row has some complex deletion for at least one of its columns.
+    private final static int HAS_ALL_COLUMNS      = 0x80; // Whether the encoded row has all of the columns from the header present
 
     public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version)
     throws IOException
@@ -105,9 +98,11 @@ public class UnfilteredSerializer
         int flags = 0;
         boolean isStatic = row.isStatic();
 
+        Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
         DeletionTime deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
+        boolean hasAllColumns = (row.size() == headerColumns.size());
 
         if (isStatic)
             flags |= IS_STATIC;
@@ -119,6 +114,8 @@ public class UnfilteredSerializer
             flags |= HAS_DELETION;
         if (hasComplexDeletion)
             flags |= HAS_COMPLEX_DELETION;
+        if (hasAllColumns)
+            flags |= HAS_ALL_COLUMNS;
 
         out.writeByte((byte)flags);
         if (!isStatic)
@@ -134,55 +131,27 @@ public class UnfilteredSerializer
         if ((flags & HAS_DELETION) != 0)
             header.writeDeletionTime(deletion, out);
 
-        Columns columns = header.columns(isStatic);
-        int simpleCount = columns.simpleColumnCount();
-        boolean useSparse = header.useSparseColumnLayout(isStatic);
-        SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
-
-        for (int i = 0; i < simpleCount; i++)
-            writeSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), pkLiveness, header, out, useSparse);
+        if (!hasAllColumns)
+            Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out);
 
-        for (int i = simpleCount; i < columns.columnCount(); i++)
-            writeComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, out, useSparse);
-
-        if (useSparse)
-            out.writeVInt(-1);
-    }
-
-    private void writeSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse)
-    throws IOException
-    {
-        if (useSparse)
+        for (ColumnData data : row)
         {
-            if (cell == null)
-                return;
-
-            out.writeVInt(idx);
+            if (data.column.isSimple())
+                Cell.serializer.serialize((Cell) data, out, pkLiveness, header);
+            else
+                writeComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header, out);
         }
-        Cell.serializer.serialize(cell, out, rowLiveness, header);
     }
 
-    private void writeComplexColumn(int idx, ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse)
+    private void writeComplexColumn(ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out)
     throws IOException
     {
-        if (useSparse)
-        {
-            if (data == null)
-                return;
-
-            out.writeVInt(idx);
-        }
-
         if (hasComplexDeletion)
             header.writeDeletionTime(data == null ? DeletionTime.LIVE : data.complexDeletion(), out);
 
-        if (data != null)
-        {
-            for (Cell cell : data)
-                Cell.serializer.serialize(cell, out, rowLiveness, header);
-        }
-
-        Cell.serializer.serialize(null, out, rowLiveness, header);
+        out.writeUnsignedVInt(data.cellsCount());
+        for (Cell cell : data)
+            Cell.serializer.serialize(cell, out, rowLiveness, header);
     }
 
     public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version)
@@ -215,9 +184,11 @@ public class UnfilteredSerializer
         long size = 1; // flags
 
         boolean isStatic = row.isStatic();
+        Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
         DeletionTime deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
+        boolean hasAllColumns = (row.size() == headerColumns.size());
 
         if (!isStatic)
             size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
@@ -232,57 +203,32 @@ public class UnfilteredSerializer
         if (!deletion.isLive())
             size += header.deletionTimeSerializedSize(deletion);
 
-        Columns columns = header.columns(isStatic);
-        int simpleCount = columns.simpleColumnCount();
-        boolean useSparse = header.useSparseColumnLayout(isStatic);
-        SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
-
-        for (int i = 0; i < simpleCount; i++)
-            size += sizeOfSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), pkLiveness, header, useSparse);
-
-        for (int i = simpleCount; i < columns.columnCount(); i++)
-            size += sizeOfComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, useSparse);
-
-        if (useSparse)
-            size += TypeSizes.sizeofVInt(-1);
+        if (!hasAllColumns)
+            size += Columns.serializer.serializedSubsetSize(Collections2.transform(row, ColumnData::column), header.columns(isStatic));
 
-        return size;
-    }
-
-    private long sizeOfSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, boolean useSparse)
-    {
-        long size = 0;
-        if (useSparse)
+        for (ColumnData data : row)
         {
-            if (cell == null)
-                return size;
-
-            size += TypeSizes.sizeofVInt(idx);
+            if (data.column.isSimple())
+                size += Cell.serializer.serializedSize((Cell) data, pkLiveness, header);
+            else
+                size += sizeOfComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header);
         }
-        return size + Cell.serializer.serializedSize(cell, rowLiveness, header);
+
+        return size;
     }
 
-    private long sizeOfComplexColumn(int idx, ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, boolean useSparse)
+    private long sizeOfComplexColumn(ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header)
     {
         long size = 0;
-        if (useSparse)
-        {
-            if (data == null)
-                return size;
-
-            size += TypeSizes.sizeofVInt(idx);
-        }
 
         if (hasComplexDeletion)
             size += header.deletionTimeSerializedSize(data == null ? DeletionTime.LIVE : data.complexDeletion());
 
-        if (data != null)
-        {
-            for (Cell cell : data)
-                size += Cell.serializer.serializedSize(cell, rowLiveness, header);
-        }
+        size += TypeSizes.sizeofUnsignedVInt(data.cellsCount());
+        for (Cell cell : data)
+            size += Cell.serializer.serializedSize(cell, rowLiveness, header);
 
-        return size + Cell.serializer.serializedSize(null, rowLiveness, header);
+        return size;
     }
 
     public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version)
@@ -369,6 +315,8 @@ public class UnfilteredSerializer
             boolean hasTTL = (flags & HAS_TTL) != 0;
             boolean hasDeletion = (flags & HAS_DELETION) != 0;
             boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+            boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
+            Columns headerColumns = header.columns(isStatic);
 
             LivenessInfo rowLiveness = LivenessInfo.EMPTY;
             if (hasTimestamp)
@@ -382,33 +330,16 @@ public class UnfilteredSerializer
             builder.addPrimaryKeyLivenessInfo(rowLiveness);
             builder.addRowDeletion(hasDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE);
 
-            Columns columns = header.columns(isStatic);
-            if (header.useSparseColumnLayout(isStatic))
-            {
-                int count = columns.columnCount();
-                int simpleCount = columns.simpleColumnCount();
-                int i;
-                while ((i = (int)in.readVInt()) >= 0)
-                {
-                    if (i > count)
-                        throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
-
-                    if (i < simpleCount)
-                        readSimpleColumn(columns.getSimple(i), in, header, helper, builder, rowLiveness);
-                    else
-                        readComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion, builder, rowLiveness);
-                }
-            }
-            else
+            Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
+            for (ColumnDefinition column : columns)
             {
-                for (int i = 0; i < columns.simpleColumnCount(); i++)
-                    readSimpleColumn(columns.getSimple(i), in, header, helper, builder, rowLiveness);
-
-                for (int i = 0; i < columns.complexColumnCount(); i++)
-                    readComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion, builder, rowLiveness);
+                if (column.isSimple())
+                    readSimpleColumn(column, in, header, helper, builder, rowLiveness);
+                else
+                    readComplexColumn(column, in, header, helper, hasComplexDeletion, builder, rowLiveness);
             }
 
-                return builder.build();
+            return builder.build();
         }
         catch (RuntimeException | AssertionError e)
         {
@@ -426,7 +357,7 @@ public class UnfilteredSerializer
         if (helper.includes(column))
         {
             Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper);
-            if (cell != null && !helper.isDropped(cell, false))
+            if (!helper.isDropped(cell, false))
                 builder.addCell(cell);
         }
         else
@@ -448,9 +379,10 @@ public class UnfilteredSerializer
                     builder.addComplexDeletion(column, complexDeletion);
             }
 
-            Cell cell;
-            while ((cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper)) != null)
+            int count = (int) in.readUnsignedVInt();
+            while (--count >= 0)
             {
+                Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper);
                 if (helper.includes(cell.path()) && !helper.isDropped(cell, true))
                     builder.addCell(cell);
             }
@@ -470,6 +402,8 @@ public class UnfilteredSerializer
         boolean hasTTL = (flags & HAS_TTL) != 0;
         boolean hasDeletion = (flags & HAS_DELETION) != 0;
         boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+        boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
+        Columns headerColumns = header.columns(isStatic);
 
         // Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have
         // the size we think due to VINT encoding
@@ -483,30 +417,13 @@ public class UnfilteredSerializer
         if (hasDeletion)
             header.skipDeletionTime(in);
 
-        Columns columns = header.columns(isStatic);
-        if (header.useSparseColumnLayout(isStatic))
-        {
-            int count = columns.columnCount();
-            int simpleCount = columns.simpleColumnCount();
-            int i;
-            while ((i = (int)in.readVInt()) >= 0)
-            {
-                if (i > count)
-                    throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
-
-                if (i < simpleCount)
-                    Cell.serializer.skip(in, columns.getSimple(i), header);
-                else
-                    skipComplexColumn(in, columns.getComplex(i - simpleCount), header, hasComplexDeletion);
-            }
-        }
-        else
+        Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
+        for (ColumnDefinition column : columns)
         {
-            for (int i = 0; i < columns.simpleColumnCount(); i++)
-                Cell.serializer.skip(in, columns.getSimple(i), header);
-
-            for (int i = 0; i < columns.complexColumnCount(); i++)
-                skipComplexColumn(in, columns.getComplex(i), header, hasComplexDeletion);
+            if (column.isSimple())
+                Cell.serializer.skip(in, column, header);
+            else
+                skipComplexColumn(in, column, header, hasComplexDeletion);
         }
     }
 
@@ -536,7 +453,9 @@ public class UnfilteredSerializer
         if (hasComplexDeletion)
             header.skipDeletionTime(in);
 
-        while (Cell.serializer.skip(in, column, header));
+        int count = (int) in.readUnsignedVInt();
+        while (--count >= 0)
+            Cell.serializer.skip(in, column, header);
     }
 
     public static boolean isEndOfPartition(int flags)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 793cd81..acdf6bb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -127,7 +127,7 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implem
     {
         // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
         // SerializationHeader.make() for details) so we use the latter instead.
-        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
     }
 
     public int compareTo(SSTableIdentityIterator o)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/ColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java b/test/unit/org/apache/cassandra/db/ColumnsTest.java
index 1a91c3d..c080948 100644
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@ -83,9 +83,9 @@ public class ColumnsTest
 
     private void assertSubset(Columns superset, Columns subset)
     {
-        Assert.assertTrue(superset.contains(superset));
-        Assert.assertTrue(superset.contains(subset));
-        Assert.assertFalse(subset.contains(superset));
+        Assert.assertTrue(superset.containsAll(superset));
+        Assert.assertTrue(superset.containsAll(subset));
+        Assert.assertFalse(subset.containsAll(superset));
     }
 
     @Test
@@ -275,7 +275,7 @@ public class ColumnsTest
             Columns subset = columns;
             for (ColumnDefinition def : remove)
                 subset = subset.without(def);
-            Assert.assertEquals(columns.columnCount() - remove.size(), subset.columnCount());
+            Assert.assertEquals(columns.size() - remove.size(), subset.size());
             List<ColumnDefinition> remainingDefs = Lists.newArrayList(columns);
             remainingDefs.removeAll(remove);
             return new ColumnsCheck(subset, remainingDefs);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index f651093..f0a63a8 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -105,7 +105,7 @@ public class PartitionTest
 
         CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new DataInputBuffer(bufOut.getData()));
 
-        assertEquals(partition.columns().regulars.columnCount(), deserialized.columns().regulars.columnCount());
+        assertEquals(partition.columns().regulars.size(), deserialized.columns().regulars.size());
         assertTrue(deserialized.columns().regulars.getSimple(1).equals(partition.columns().regulars.getSimple(1)));
         assertTrue(deserialized.columns().regulars.getSimple(5).equals(partition.columns().regulars.getSimple(5)));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 3aaf21f..cd80a2f 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -141,7 +141,7 @@ public class RowTest
         row = (Row) unfiltered;
         assertEquals("a2", defA.cellValueType().getString(row.getCell(defA).value()));
         assertEquals("b1", defB.cellValueType().getString(row.getCell(defB).value()));
-        assertEquals(2, row.columns().columnCount());
+        assertEquals(2, row.columns().size());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 5887fd4..e9bf4c5 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -325,8 +326,9 @@ public class ScrubTest
                                                                    keys.size(),
                                                                    0L,
                                                                    0,
-                                                                   SerializationHeader.make(cfs.metadata,
-                                                                                            Collections.emptyList())))
+                                                                   new SerializationHeader(cfs.metadata,
+                                                                                           cfs.metadata.partitionColumns(),
+                                                                                           EncodingStats.NO_STATS)))
             {
 
                 for (String k : keys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 89c0d61..7826317 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -179,9 +179,17 @@ public class SSTableUtils
 
         public Collection<SSTableReader> write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException
         {
+            PartitionColumns.Builder builder = PartitionColumns.builder();
+            for (PartitionUpdate update : sorted.values())
+                builder.addAll(update.columns());
             final Iterator<Map.Entry<DecoratedKey, PartitionUpdate>> iter = sorted.entrySet().iterator();
             return write(sorted.size(), new Appender()
             {
+                public SerializationHeader header()
+                {
+                    return new SerializationHeader(Schema.instance.getCFMetaData(ksname, cfname), builder.build(), EncodingStats.NO_STATS);
+                }
+
                 @Override
                 public boolean append(SSTableTxnWriter writer) throws IOException
                 {
@@ -207,7 +215,7 @@ public class SSTableUtils
             File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA));
             CFMetaData cfm = Schema.instance.getCFMetaData(ksname, cfname);
             ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.cfId);
-            SerializationHeader header = SerializationHeader.make(cfm, Collections.EMPTY_LIST);
+            SerializationHeader header = appender.header();
             SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
             while (appender.append(writer)) { /* pass */ }
             Collection<SSTableReader> readers = writer.finish(true);
@@ -223,6 +231,7 @@ public class SSTableUtils
 
     public static abstract class Appender
     {
+        public abstract SerializationHeader header();
         /** Called with an open writer until it returns false. */
         public abstract boolean append(SSTableTxnWriter writer) throws IOException;
     }