You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/22 18:05:40 UTC

[11/15] cassandra git commit: Simplify some 8099's implementations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
index 54feb85..33a0917 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.filter;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -26,6 +25,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
@@ -146,7 +146,7 @@ public interface ClusteringIndexFilter
     public interface Serializer
     {
         public void serialize(ClusteringIndexFilter filter, DataOutputPlus out, int version) throws IOException;
-        public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException;
+        public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException;
         public long serializedSize(ClusteringIndexFilter filter, int version);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index f2cc46f..13329f3 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.filter;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.*;
 
@@ -27,6 +26,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTreeSet;
@@ -94,6 +94,9 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
 
     public boolean isFullyCoveredBy(CachedPartition partition)
     {
+        if (partition.isEmpty())
+            return false;
+
         // 'partition' contains all columns, so it covers our filter if our last clusterings
         // is smaller than the last in the cache
         return clusterings.comparator().compare(clusterings.last(), partition.lastRow().clustering()) <= 0;
@@ -109,18 +112,18 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
     {
         // Note that we don't filter markers because that's a bit trickier (we don't know in advance until when
         // the range extend) and it's harmless to left them.
-        return new FilteringRowIterator(iterator)
+        return new AlteringUnfilteredRowIterator(iterator)
         {
             @Override
-            public FilteringRow makeRowFilter()
+            public Row computeNextStatic(Row row)
             {
-                return FilteringRow.columnsFilteringRow(columnFilter);
+                return columnFilter.fetchedColumns().statics.isEmpty() ? null : row.filter(columnFilter, iterator.metadata());
             }
 
             @Override
-            protected boolean includeRow(Row row)
+            public Row computeNext(Row row)
             {
-                return clusterings.contains(row.clustering());
+                return clusterings.contains(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null;
             }
         };
     }
@@ -214,7 +217,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
             sb.append(i++ == 0 ? "" : ", ").append(clustering.toString(metadata));
         if (reversed)
             sb.append(", reversed");
-        return sb.append(")").toString();
+        return sb.append(')').toString();
     }
 
     public String toCQLString(CFMetaData metadata)
@@ -223,7 +226,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
             return "";
 
         StringBuilder sb = new StringBuilder();
-        sb.append("(").append(ColumnDefinition.toCQLString(metadata.clusteringColumns())).append(")");
+        sb.append('(').append(ColumnDefinition.toCQLString(metadata.clusteringColumns())).append(')');
         sb.append(clusterings.size() == 1 ? " = " : " IN (");
         int i = 0;
         for (Clustering clustering : clusterings)
@@ -258,13 +261,13 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
 
     private static class NamesDeserializer extends InternalDeserializer
     {
-        public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException
+        public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException
         {
             ClusteringComparator comparator = metadata.comparator;
             BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(comparator);
             int size = in.readInt();
             for (int i = 0; i < size; i++)
-                clusterings.add(Clustering.serializer.deserialize(in, version, comparator.subtypes()).takeAlias());
+                clusterings.add(Clustering.serializer.deserialize(in, version, comparator.subtypes()));
 
             return new ClusteringIndexNamesFilter(clusterings.build(), reversed);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
index 8fb319e..4f0e4e2 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.filter;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.List;
 import java.nio.ByteBuffer;
@@ -28,6 +27,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
@@ -91,24 +91,24 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
 
         // Note that we don't filter markers because that's a bit trickier (we don't know in advance until when
         // the range extend) and it's harmless to leave them.
-        return new FilteringRowIterator(iterator)
+        return new AlteringUnfilteredRowIterator(iterator)
         {
             @Override
-            public FilteringRow makeRowFilter()
+            public boolean hasNext()
             {
-                return FilteringRow.columnsFilteringRow(columnFilter);
+                return !tester.isDone() && super.hasNext();
             }
 
             @Override
-            protected boolean includeRow(Row row)
+            public Row computeNextStatic(Row row)
             {
-                return tester.includes(row.clustering());
+                return columnFilter.fetchedColumns().statics.isEmpty() ? null : row.filter(columnFilter, iterator.metadata());
             }
 
             @Override
-            public boolean hasNext()
+            public Row computeNext(Row row)
             {
-                return !tester.isDone() && super.hasNext();
+                return tester.includes(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null;
             }
         };
     }
@@ -170,7 +170,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
 
     private static class SliceDeserializer extends InternalDeserializer
     {
-        public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException
+        public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException
         {
             Slices slices = Slices.serializer.deserialize(in, version, metadata);
             return new ClusteringIndexSliceFilter(slices, reversed);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index b98108d..084bad6 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.filter;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.*;
 
@@ -30,8 +29,8 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
@@ -52,15 +51,6 @@ public class ColumnFilter
 {
     public static final Serializer serializer = new Serializer();
 
-    private static final Comparator<ColumnSubselection> valueComparator = new Comparator<ColumnSubselection>()
-    {
-        public int compare(ColumnSubselection s1, ColumnSubselection s2)
-        {
-            assert s1.column().name.equals(s2.column().name);
-            return s1.column().cellPathComparator().compare(s1.minIncludedPath(), s2.minIncludedPath());
-        }
-    };
-
     // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved
     // by the query, but the values for column/cells not selected by 'selection' and 'subSelections' will be skipped.
     // Otherwise, only the column/cells returned by 'selection' and 'subSelections' will be returned at all.
@@ -115,6 +105,11 @@ public class ColumnFilter
         return isFetchAll ? metadata.partitionColumns() : selection;
     }
 
+    public boolean includesAllColumns()
+    {
+        return isFetchAll;
+    }
+
     /**
      * Whether the provided column is selected by this selection.
      */
@@ -144,7 +139,7 @@ public class ColumnFilter
             return true;
 
         for (ColumnSubselection subSel : s)
-            if (subSel.includes(cell.path()))
+            if (subSel.compareInclusionOf(cell.path()) == 0)
                 return true;
 
         return false;
@@ -163,7 +158,7 @@ public class ColumnFilter
             return false;
 
         for (ColumnSubselection subSel : s)
-            if (subSel.includes(path))
+            if (subSel.compareInclusionOf(path) == 0)
                 return false;
 
         return true;
@@ -182,7 +177,7 @@ public class ColumnFilter
         if (s.isEmpty())
             return null;
 
-        return new Tester(s.iterator());
+        return new Tester(isFetchAll, s.iterator());
     }
 
     /**
@@ -205,46 +200,43 @@ public class ColumnFilter
 
     public static class Tester
     {
+        private final boolean isFetchAll;
         private ColumnSubselection current;
         private final Iterator<ColumnSubselection> iterator;
 
-        private Tester(Iterator<ColumnSubselection> iterator)
+        private Tester(boolean isFetchAll, Iterator<ColumnSubselection> iterator)
         {
+            this.isFetchAll = isFetchAll;
             this.iterator = iterator;
         }
 
         public boolean includes(CellPath path)
         {
-            while (current == null)
-            {
-                if (!iterator.hasNext())
-                    return false;
-
-                current = iterator.next();
-                if (current.includes(path))
-                    return true;
-
-                if (current.column().cellPathComparator().compare(current.maxIncludedPath(), path) < 0)
-                    current = null;
-            }
-            return false;
+            return isFetchAll || includedBySubselection(path);
         }
 
         public boolean canSkipValue(CellPath path)
         {
-            while (current == null)
+            return isFetchAll && !includedBySubselection(path);
+        }
+
+        private boolean includedBySubselection(CellPath path)
+        {
+            while (current != null || iterator.hasNext())
             {
-                if (!iterator.hasNext())
-                    return false;
+                if (current == null)
+                    current = iterator.next();
 
-                current = iterator.next();
-                if (current.includes(path))
+                int cmp = current.compareInclusionOf(path);
+                if (cmp == 0) // The path is included
+                    return true;
+                else if (cmp < 0) // The path is before this sub-selection, it's not included by any
                     return false;
 
-                if (current.column().cellPathComparator().compare(current.maxIncludedPath(), path) < 0)
-                    current = null;
+                // the path is after this sub-selection, we need to check the next one.
+                current = null;
             }
-            return true;
+            return false;
         }
     }
 
@@ -302,7 +294,7 @@ public class ColumnFilter
             SortedSetMultimap<ColumnIdentifier, ColumnSubselection> s = null;
             if (subSelections != null)
             {
-                s = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), valueComparator);
+                s = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder());
                 for (ColumnSubselection subSelection : subSelections)
                     s.put(subSelection.column().name, subSelection);
             }
@@ -317,6 +309,9 @@ public class ColumnFilter
         if (selection == null)
             return "*";
 
+        if (selection.isEmpty())
+            return "";
+
         Iterator<ColumnDefinition> defs = selection.selectOrderIterator();
         StringBuilder sb = new StringBuilder();
         appendColumnDef(sb, defs.next());
@@ -351,7 +346,7 @@ public class ColumnFilter
         private static final int HAS_SELECTION_MASK      = 0x02;
         private static final int HAS_SUB_SELECTIONS_MASK = 0x04;
 
-        private int makeHeaderByte(ColumnFilter selection)
+        private static int makeHeaderByte(ColumnFilter selection)
         {
             return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
                  | (selection.selection != null ? HAS_SELECTION_MASK : 0)
@@ -376,7 +371,7 @@ public class ColumnFilter
             }
         }
 
-        public ColumnFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+        public ColumnFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
         {
             int header = in.readUnsignedByte();
             boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0;
@@ -394,7 +389,7 @@ public class ColumnFilter
             SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null;
             if (hasSubSelections)
             {
-                subSelections = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), valueComparator);
+                subSelections = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder());
                 int size = in.readUnsignedShort();
                 for (int i = 0; i < size; i++)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
index 652e27c..e45dbee 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.filter;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
@@ -29,6 +28,7 @@ import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -38,7 +38,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  * This only make sense for complex column. For those, this allow for instance
  * to select only a slice of a map.
  */
-public abstract class ColumnSubselection
+public abstract class ColumnSubselection implements Comparable<ColumnSubselection>
 {
     public static final Serializer serializer = new Serializer();
 
@@ -72,9 +72,19 @@ public abstract class ColumnSubselection
 
     protected abstract Kind kind();
 
-    public abstract CellPath minIncludedPath();
-    public abstract CellPath maxIncludedPath();
-    public abstract boolean includes(CellPath path);
+    protected abstract CellPath comparisonPath();
+
+    public int compareTo(ColumnSubselection other)
+    {
+        assert other.column().name.equals(column().name);
+        return column().cellPathComparator().compare(comparisonPath(), other.comparisonPath());
+    }
+
+    /**
+     * Given a path, return -1 if the path is before anything selected by this subselection, 0 if it is selected by this
+     * subselection and 1 if the path is after anything selected by this subselection.
+     */
+    public abstract int compareInclusionOf(CellPath path);
 
     private static class Slice extends ColumnSubselection
     {
@@ -93,20 +103,20 @@ public abstract class ColumnSubselection
             return Kind.SLICE;
         }
 
-        public CellPath minIncludedPath()
+        public CellPath comparisonPath()
         {
             return from;
         }
 
-        public CellPath maxIncludedPath()
-        {
-            return to;
-        }
-
-        public boolean includes(CellPath path)
+        public int compareInclusionOf(CellPath path)
         {
             Comparator<CellPath> cmp = column.cellPathComparator();
-            return cmp.compare(from, path) <= 0 && cmp.compare(path, to) <= 0;
+            if (cmp.compare(path, from) < 0)
+                return -1;
+            else if (cmp.compare(to, path) < 0)
+                return 1;
+            else
+                return 0;
         }
 
         @Override
@@ -133,20 +143,14 @@ public abstract class ColumnSubselection
             return Kind.ELEMENT;
         }
 
-        public CellPath minIncludedPath()
-        {
-            return element;
-        }
-
-        public CellPath maxIncludedPath()
+        public CellPath comparisonPath()
         {
             return element;
         }
 
-        public boolean includes(CellPath path)
+        public int compareInclusionOf(CellPath path)
         {
-            Comparator<CellPath> cmp = column.cellPathComparator();
-            return cmp.compare(element, path) == 0;
+            return column.cellPathComparator().compare(path, element);
         }
 
         @Override
@@ -180,7 +184,7 @@ public abstract class ColumnSubselection
             throw new AssertionError();
         }
 
-        public ColumnSubselection deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+        public ColumnSubselection deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
         {
             ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
             ColumnDefinition column = metadata.getColumnDefinition(name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 76e29ec..206afa4 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -115,8 +115,7 @@ public abstract class DataLimits
      * The max number of results this limits enforces.
      * <p>
      * Note that the actual definition of "results" depends a bit: for CQL, it's always rows, but for
-     * thrift, it means cells. The {@link #countsCells} allows to distinguish between the two cases if
-     * needed.
+     * thrift, it means cells.
      *
      * @return the maximum number of results this limits enforces.
      */
@@ -124,8 +123,6 @@ public abstract class DataLimits
 
     public abstract int perPartitionCount();
 
-    public abstract boolean countsCells();
-
     public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
     {
         return new CountingUnfilteredPartitionIterator(iter, newCounter(nowInSec, false));
@@ -269,11 +266,6 @@ public abstract class DataLimits
             return perPartitionLimit;
         }
 
-        public boolean countsCells()
-        {
-            return false;
-        }
-
         public float estimateTotalResults(ColumnFamilyStore cfs)
         {
             // TODO: we should start storing stats on the number of rows (instead of the number of cells, which
@@ -353,7 +345,7 @@ public abstract class DataLimits
             {
                 sb.append("LIMIT ").append(rowLimit);
                 if (perPartitionLimit != Integer.MAX_VALUE)
-                    sb.append(" ");
+                    sb.append(' ');
             }
 
             if (perPartitionLimit != Integer.MAX_VALUE)
@@ -511,11 +503,6 @@ public abstract class DataLimits
             return cellPerPartitionLimit;
         }
 
-        public boolean countsCells()
-        {
-            return true;
-        }
-
         public float estimateTotalResults(ColumnFamilyStore cfs)
         {
             // remember that getMeansColumns returns a number of cells: we should clean nomenclature
@@ -572,7 +559,7 @@ public abstract class DataLimits
 
             public void newRow(Row row)
             {
-                for (Cell cell : row)
+                for (Cell cell : row.cells())
                 {
                     if (assumeLiveData || cell.isLive(nowInSec))
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 8f34efb..5a49bca 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -137,11 +137,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
         if (metadata.isCompound())
         {
             List<ByteBuffer> values = CompositeType.splitName(name);
-            return new SimpleClustering(values.toArray(new ByteBuffer[metadata.comparator.size()]));
+            return new Clustering(values.toArray(new ByteBuffer[metadata.comparator.size()]));
         }
         else
         {
-            return new SimpleClustering(name);
+            return new Clustering(name);
         }
     }
 
@@ -165,28 +165,18 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
             super(expressions);
         }
 
-        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec)
+        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
         {
             if (expressions.isEmpty())
                 return iter;
 
-            return new WrappingUnfilteredPartitionIterator(iter)
+            return new AlteringUnfilteredPartitionIterator(iter)
             {
-                @Override
-                public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter)
+                protected Row computeNext(DecoratedKey partitionKey, Row row)
                 {
-                    return new FilteringRowIterator(iter)
-                    {
-                        // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them.
-                        // (we should however not filter them in the output of the method, hence it's not used as row filter for the
-                        // FilteringRowIterator)
-                        private final TombstoneFilteringRow filter = new TombstoneFilteringRow(nowInSec);
-
-                        protected boolean includeRow(Row row)
-                        {
-                            return CQLFilter.this.isSatisfiedBy(iter.partitionKey(), filter.setTo(row));
-                        }
-                    };
+                    // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them.
+                    Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
+                    return purged != null && CQLFilter.this.isSatisfiedBy(partitionKey, purged) ? row : null;
                 }
             };
         }
@@ -515,10 +505,9 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                     CollectionType<?> type = (CollectionType<?>)column.type;
                     if (column.isComplex())
                     {
-                        Iterator<Cell> iter = row.getCells(column);
-                        while (iter.hasNext())
+                        ComplexColumnData complexData = row.getComplexColumnData(column);
+                        for (Cell cell : complexData)
                         {
-                            Cell cell = iter.next();
                             if (type.kind == CollectionType.Kind.SET)
                             {
                                 if (type.nameComparator().compare(cell.path().get(0), value) == 0)
@@ -720,7 +709,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
 
             // In thrift, we actually allow expression on non-defined columns for the sake of filtering. To accomodate
             // this we create a "fake" definition. This is messy but it works so is probably good enough.
-            return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type, null);
+            return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type);
         }
 
         public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index 794744a..c3a3c08 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -108,17 +108,16 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
     public void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
     {
-        delete(rowKey, clustering, cell.value(), cell.path(), new SimpleDeletionTime(cell.livenessInfo().timestamp(), nowInSec), opGroup);
+        delete(rowKey, clustering, cell.value(), cell.path(), new DeletionTime(cell.timestamp(), nowInSec), opGroup);
     }
 
     public void delete(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path, DeletionTime deletion, OpOrder.Group opGroup)
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path));
-        PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, valueKey, PartitionColumns.NONE, 1);
-        Row.Writer writer = upd.writer();
-        Rows.writeClustering(makeIndexClustering(rowKey, clustering, path), writer);
-        writer.writeRowDeletion(deletion);
-        writer.endOfRow();
+
+        Row row = ArrayBackedRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion);
+        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
+
         indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
         if (logger.isDebugEnabled())
             logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, upd);
@@ -126,18 +125,16 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
     public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup)
     {
-        insert(rowKey, clustering, cell, cell.livenessInfo(), opGroup);
+        insert(rowKey, clustering, cell, LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), opGroup);
     }
 
     public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, LivenessInfo info, OpOrder.Group opGroup)
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell));
 
-        PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, valueKey, PartitionColumns.NONE, 1);
-        Row.Writer writer = upd.writer();
-        Rows.writeClustering(makeIndexClustering(rowKey, clustering, cell), writer);
-        writer.writePartitionKeyLivenessInfo(info);
-        writer.endOfRow();
+        Row row = ArrayBackedRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info);
+        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
+
         if (logger.isDebugEnabled())
             logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.getKey()), upd);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
index ab8e688..897aa9c 100644
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -87,17 +88,18 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
             long timestamp = row.primaryKeyLivenessInfo().timestamp();
             int ttl = row.primaryKeyLivenessInfo().ttl();
 
-            for (Cell cell : row)
+            for (Cell cell : row.cells())
             {
-                if (cell.isLive(nowInSec) && cell.livenessInfo().timestamp() > timestamp)
+                if (cell.isLive(nowInSec) && cell.timestamp() > timestamp)
                 {
-                    timestamp = cell.livenessInfo().timestamp();
-                    ttl = cell.livenessInfo().ttl();
+                    timestamp = cell.timestamp();
+                    ttl = cell.ttl();
                 }
             }
             maybeIndex(key.getKey(), clustering, timestamp, ttl, opGroup, nowInSec);
         }
-        for (Cell cell : row)
+
+        for (Cell cell : row.cells())
         {
             if (!indexes(cell.column()))
                 continue;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 1bd5452..aaefc9c 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -18,15 +18,7 @@
 package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -469,7 +461,8 @@ public class SecondaryIndexManager
             if (!row.deletion().isLive())
                 for (PerColumnSecondaryIndex index : indexes)
                     index.maybeDelete(key, clustering, row.deletion(), opGroup);
-            for (Cell cell : row)
+
+            for (Cell cell : row.cells())
             {
                 for (PerColumnSecondaryIndex index : indexes)
                 {
@@ -636,8 +629,7 @@ public class SecondaryIndexManager
         // Completely identical cells (including expiring columns with
         // identical ttl & localExpirationTime) will not get this far due
         // to the oldCell.equals(newCell) in StandardUpdater.update
-        return !oldCell.value().equals(newCell.value())
-            || oldCell.livenessInfo().timestamp() != newCell.livenessInfo().timestamp();
+        return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
     }
 
     private Set<String> filterByColumn(Set<String> idxNames)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 1d978a2..d4ca707 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -112,7 +112,7 @@ public abstract class SecondaryIndexSearcher
                 NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows();
                 BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.getIndexComparator());
                 for (Clustering c : requested)
-                    clusterings.add(index.makeIndexClustering(pk, c, (Cell)null).takeAlias());
+                    clusterings.add(index.makeIndexClustering(pk, c, (Cell)null));
                 return new ClusteringIndexNamesFilter(clusterings.build(), filter.isReversed());
             }
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 7a40a90..e073802 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -112,11 +112,8 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
     public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec)
     {
-        PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, entry.indexValue, PartitionColumns.NONE, 1);
-        Row.Writer writer = upd.writer();
-        Rows.writeClustering(entry.indexClustering, writer);
-        writer.writeRowDeletion(new SimpleDeletionTime(entry.timestamp, nowInSec));
-        writer.endOfRow();
+        Row row = ArrayBackedRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec));
+        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, entry.indexValue, row);
         indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
 
         if (logger.isDebugEnabled())
@@ -159,10 +156,10 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
         public IndexedEntry(DecoratedKey indexValue, Clustering indexClustering, long timestamp, ByteBuffer indexedKey, Clustering indexedEntryClustering)
         {
             this.indexValue = indexValue;
-            this.indexClustering = indexClustering.takeAlias();
+            this.indexClustering = indexClustering;
             this.timestamp = timestamp;
             this.indexedKey = indexedKey;
-            this.indexedEntryClustering = indexedEntryClustering.takeAlias();
+            this.indexedEntryClustering = indexedEntryClustering;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index aa58511..6529ad9 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -118,7 +118,7 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
     public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
     {
         if (clustering != Clustering.STATIC_CLUSTERING && clustering.get(columnDef.position()) != null)
-            insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup);
+            insert(partitionKey, clustering, null, LivenessInfo.create(indexCfs.metadata, timestamp, ttl, nowInSec), opGroup);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
index 5af842c..30391cf 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
@@ -90,10 +90,9 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex
 
     public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
     {
-        Iterator<Cell> iter = data.getCells(columnDef);
-        while (iter.hasNext())
+        ComplexColumnData complexData = data.getComplexColumnData(columnDef);
+        for (Cell cell : complexData)
         {
-            Cell cell = iter.next();
             if (cell.isLive(nowInSec) && ((CollectionType) columnDef.type).valueComparator().compare(indexValue, cell.value()) == 0)
                 return false;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index d48e58b..a93f8e1 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -93,7 +93,7 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
     @Override
     public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
     {
-        insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup);
+        insert(partitionKey, clustering, null, LivenessInfo.create(indexCfs.metadata, timestamp, ttl, nowInSec), opGroup);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 029dd3c..ce92164 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -171,49 +171,20 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                                                      final OpOrder.Group writeOp,
                                                      final int nowInSec)
     {
-        return new WrappingUnfilteredRowIterator(dataIter)
+        return new AlteringUnfilteredRowIterator(dataIter)
         {
             private int entriesIdx;
-            private Unfiltered next;
 
             @Override
-            public boolean hasNext()
-            {
-                return prepareNext();
-            }
-
-            @Override
-            public Unfiltered next()
+            protected Row computeNext(Row row)
             {
-                if (next == null)
-                    prepareNext();
+                CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec);
+                if (!index.isStale(row, indexValue, nowInSec))
+                    return row;
 
-                Unfiltered toReturn = next;
-                next = null;
-                return toReturn;
-            }
-
-            private boolean prepareNext()
-            {
-                if (next != null)
-                    return true;
-
-                while (super.hasNext())
-                {
-                    next = super.next();
-                    if (next.kind() != Unfiltered.Kind.ROW)
-                        return true;
-
-                    Row row = (Row)next;
-                    CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec);
-                    if (!index.isStale(row, indexValue, nowInSec))
-                        return true;
-
-                    // The entry is stale: delete the entry and ignore otherwise
-                    index.delete(entry, writeOp, nowInSec);
-                    next = null;
-                }
-                return false;
+                // The entry is stale: delete the entry and ignore otherwise
+                index.delete(entry, writeOp, nowInSec);
+                return null;
             }
 
             private CompositesIndex.IndexedEntry findEntry(Clustering clustering, OpOrder.Group writeOp, int nowInSec)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 6b53640..118fb75 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -138,7 +138,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
             // is the indexed name. Ans so we need to materialize the partition.
             ArrayBackedPartition result = ArrayBackedPartition.create(iterator);
             iterator.close();
-            Row data = result.getRow(new SimpleClustering(index.indexedColumn().name.bytes));
+            Row data = result.getRow(new Clustering(index.indexedColumn().name.bytes));
             Cell cell = data == null ? null : data.getCell(baseCfs.metadata.compactValueColumn());
             return deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec)
                  ? null
@@ -173,10 +173,10 @@ public class KeysSearcher extends SecondaryIndexSearcher
         {
             // Index is stale, remove the index entry and ignore
             index.delete(partitionKey.getKey(),
-                         new SimpleClustering(index.indexedColumn().name.bytes),
+                         new Clustering(index.indexedColumn().name.bytes),
                          indexedValue,
                          null,
-                         new SimpleDeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
+                         new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
                          writeOp);
             return true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 78ead36..258a8a5 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.marshal;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -36,6 +35,7 @@ import org.apache.cassandra.serializers.MarshalException;
 
 import org.github.jamm.Unmetered;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -325,7 +325,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
         if (valueLengthIfFixed() >= 0)
             out.write(value);
         else
-            ByteBufferUtil.writeWithLength(value, out);
+            ByteBufferUtil.writeWithVIntLength(value, out);
     }
 
     public long writtenLength(ByteBuffer value)
@@ -333,25 +333,25 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
         assert value.hasRemaining();
         return valueLengthIfFixed() >= 0
              ? value.remaining()
-             : TypeSizes.sizeofWithLength(value);
+             : TypeSizes.sizeofWithVIntLength(value);
     }
 
-    public ByteBuffer readValue(DataInput in) throws IOException
+    public ByteBuffer readValue(DataInputPlus in) throws IOException
     {
         int length = valueLengthIfFixed();
         if (length >= 0)
             return ByteBufferUtil.read(in, length);
         else
-            return ByteBufferUtil.readWithLength(in);
+            return ByteBufferUtil.readWithVIntLength(in);
     }
 
-    public void skipValue(DataInput in) throws IOException
+    public void skipValue(DataInputPlus in) throws IOException
     {
         int length = valueLengthIfFixed();
-        if (length < 0)
-            length = in.readInt();
-
-        FileUtils.skipBytesFully(in, length);
+        if (length >= 0)
+            FileUtils.skipBytesFully(in, length);
+        else
+            ByteBufferUtil.skipWithVIntLength(in);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index a850305..9a096d0 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.List;
 import java.util.Iterator;
@@ -34,6 +33,7 @@ import org.apache.cassandra.cql3.Sets;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.serializers.CollectionSerializer;
@@ -236,23 +236,22 @@ public abstract class CollectionType<T> extends AbstractType<T>
     {
         public void serialize(CellPath path, DataOutputPlus out) throws IOException
         {
-            ByteBufferUtil.writeWithLength(path.get(0), out);
+            ByteBufferUtil.writeWithVIntLength(path.get(0), out);
         }
 
-        public CellPath deserialize(DataInput in) throws IOException
+        public CellPath deserialize(DataInputPlus in) throws IOException
         {
-            return CellPath.create(ByteBufferUtil.readWithLength(in));
+            return CellPath.create(ByteBufferUtil.readWithVIntLength(in));
         }
 
         public long serializedSize(CellPath path)
         {
-            return TypeSizes.sizeofWithLength(path.get(0));
+            return ByteBufferUtil.serializedSizeWithVIntLength(path.get(0));
         }
 
-        public void skip(DataInput in) throws IOException
+        public void skip(DataInputPlus in) throws IOException
         {
-            int length = in.readInt();
-            FileUtils.skipBytesFully(in, length);
+            ByteBufferUtil.skipWithVIntLength(in);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java b/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
deleted file mode 100644
index 6775cf1..0000000
--- a/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
+++ /dev/null
@@ -1,850 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.utils.SearchIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract common class for all non-thread safe Partition implementations.
- */
-public abstract class AbstractPartitionData implements Partition, Iterable<Row>
-{
-    private static final Logger logger = LoggerFactory.getLogger(AbstractPartitionData.class);
-
-    protected final CFMetaData metadata;
-    protected final DecoratedKey key;
-
-    protected final DeletionInfo deletionInfo;
-    protected final PartitionColumns columns;
-
-    protected Row staticRow;
-
-    protected int rows;
-
-    // The values for the clustering columns of the rows contained in this partition object. If
-    // clusteringSize is the size of the clustering comparator for this table, clusterings has size
-    // clusteringSize * rows where rows is the number of rows stored, and row i has it's clustering
-    // column values at indexes [clusteringSize * i, clusteringSize * (i + 1)).
-    protected ByteBuffer[] clusterings;
-
-    // The partition key column liveness infos for the rows of this partition (row i has its liveness info at index i).
-    protected final LivenessInfoArray livenessInfos;
-    // The row deletion for the rows of this partition (row i has its row deletion at index i).
-    protected final DeletionTimeArray deletions;
-
-    // The row data (cells data + complex deletions for complex columns) for the rows contained in this partition.
-    protected final RowDataBlock data;
-
-    // Stats over the rows stored in this partition.
-    private final RowStats.Collector statsCollector = new RowStats.Collector();
-
-    // The maximum timestamp for any data contained in this partition.
-    protected long maxTimestamp = Long.MIN_VALUE;
-
-    private AbstractPartitionData(CFMetaData metadata,
-                                    DecoratedKey key,
-                                    DeletionInfo deletionInfo,
-                                    ByteBuffer[] clusterings,
-                                    LivenessInfoArray livenessInfos,
-                                    DeletionTimeArray deletions,
-                                    PartitionColumns columns,
-                                    RowDataBlock data)
-    {
-        this.metadata = metadata;
-        this.key = key;
-        this.deletionInfo = deletionInfo;
-        this.clusterings = clusterings;
-        this.livenessInfos = livenessInfos;
-        this.deletions = deletions;
-        this.columns = columns;
-        this.data = data;
-
-        collectStats(deletionInfo.getPartitionDeletion());
-        Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
-        while (iter.hasNext())
-            collectStats(iter.next().deletionTime());
-    }
-
-    protected AbstractPartitionData(CFMetaData metadata,
-                                    DecoratedKey key,
-                                    DeletionInfo deletionInfo,
-                                    PartitionColumns columns,
-                                    RowDataBlock data,
-                                    int initialRowCapacity)
-    {
-        this(metadata,
-             key,
-             deletionInfo,
-             new ByteBuffer[initialRowCapacity * metadata.clusteringColumns().size()],
-             new LivenessInfoArray(initialRowCapacity),
-             new DeletionTimeArray(initialRowCapacity),
-             columns,
-             data);
-    }
-
-    protected AbstractPartitionData(CFMetaData metadata,
-                                    DecoratedKey key,
-                                    DeletionTime partitionDeletion,
-                                    PartitionColumns columns,
-                                    int initialRowCapacity,
-                                    boolean sortable)
-    {
-        this(metadata,
-             key,
-             new DeletionInfo(partitionDeletion.takeAlias()),
-             columns,
-             new RowDataBlock(columns.regulars, initialRowCapacity, sortable, metadata.isCounter()),
-             initialRowCapacity);
-    }
-
-    private void collectStats(DeletionTime dt)
-    {
-        statsCollector.updateDeletionTime(dt);
-        maxTimestamp = Math.max(maxTimestamp, dt.markedForDeleteAt());
-    }
-
-    private void collectStats(LivenessInfo info)
-    {
-        statsCollector.updateTimestamp(info.timestamp());
-        statsCollector.updateTTL(info.ttl());
-        statsCollector.updateLocalDeletionTime(info.localDeletionTime());
-        maxTimestamp = Math.max(maxTimestamp, info.timestamp());
-    }
-
-    public CFMetaData metadata()
-    {
-        return metadata;
-    }
-
-    public DecoratedKey partitionKey()
-    {
-        return key;
-    }
-
-    public DeletionTime partitionLevelDeletion()
-    {
-        return deletionInfo.getPartitionDeletion();
-    }
-
-    public PartitionColumns columns()
-    {
-        return columns;
-    }
-
-    public Row staticRow()
-    {
-        return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
-    }
-
-    public RowStats stats()
-    {
-        return statsCollector.get();
-    }
-
-    /**
-     * The deletion info for the partition update.
-     *
-     * <b>warning:</b> the returned object should be used in a read-only fashion. In particular,
-     * it should not be used to add new range tombstones to this deletion. For that,
-     * {@link addRangeTombstone} should be used instead. The reason being that adding directly to
-     * the returned object would bypass some stats collection that {@code addRangeTombstone} does.
-     *
-     * @return the deletion info for the partition update for use as read-only.
-     */
-    public DeletionInfo deletionInfo()
-    {
-        // TODO: it is a tad fragile that deletionInfo can be but shouldn't be modified. We
-        // could add the option of providing a read-only view of a DeletionInfo instead.
-        return deletionInfo;
-    }
-
-    public void addPartitionDeletion(DeletionTime deletionTime)
-    {
-        collectStats(deletionTime);
-        deletionInfo.add(deletionTime);
-    }
-
-    public void addRangeTombstone(Slice deletedSlice, DeletionTime deletion)
-    {
-        addRangeTombstone(new RangeTombstone(deletedSlice, deletion.takeAlias()));
-    }
-
-    public void addRangeTombstone(RangeTombstone range)
-    {
-        collectStats(range.deletionTime());
-        deletionInfo.add(range, metadata.comparator);
-    }
-
-    /**
-     * Swap row i and j.
-     *
-     * This is only used when we need to reorder rows because those were not added in clustering order,
-     * which happens in {@link PartitionUpdate#sort} and {@link ArrayBackedPartition#create}. This method
-     * is public only because {@code PartitionUpdate} needs to implement {@link Sorting.Sortable}, but
-     * it should really only be used by subclasses (and with care) in practice.
-     */
-    public void swap(int i, int j)
-    {
-        int cs = metadata.clusteringColumns().size();
-        for (int k = 0; k < cs; k++)
-        {
-            ByteBuffer tmp = clusterings[j * cs + k];
-            clusterings[j * cs + k] = clusterings[i * cs + k];
-            clusterings[i * cs + k] = tmp;
-        }
-
-        livenessInfos.swap(i, j);
-        deletions.swap(i, j);
-        data.swap(i, j);
-    }
-
-    protected void merge(int i, int j, int nowInSec)
-    {
-        data.merge(i, j, nowInSec);
-        if (livenessInfos.timestamp(i) > livenessInfos.timestamp(j))
-            livenessInfos.move(i, j);
-        if (deletions.supersedes(i, j))
-            deletions.move(i, j);
-    }
-
-    protected void move(int i, int j)
-    {
-        int cs = metadata.clusteringColumns().size();
-        for (int k = 0; k < cs; k++)
-            clusterings[j * cs + k] = clusterings[i * cs + k];
-        data.move(i, j);
-        livenessInfos.move(i, j);
-        deletions.move(i, j);
-    }
-
-    public int rowCount()
-    {
-        return rows;
-    }
-
-    public boolean isEmpty()
-    {
-        return deletionInfo.isLive() && rows == 0 && staticRow().isEmpty();
-    }
-
-    protected void clear()
-    {
-        rows = 0;
-        Arrays.fill(clusterings, null);
-        livenessInfos.clear();
-        deletions.clear();
-        data.clear();
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder();
-        CFMetaData metadata = metadata();
-        sb.append(String.format("Partition[%s.%s] key=%s columns=%s deletion=%s",
-                    metadata.ksName,
-                    metadata.cfName,
-                    metadata.getKeyValidator().getString(partitionKey().getKey()),
-                    columns(),
-                    deletionInfo));
-
-        if (staticRow() != Rows.EMPTY_STATIC_ROW)
-            sb.append("\n    ").append(staticRow().toString(metadata, true));
-
-        // We use createRowIterator() directly instead of iterator() because that avoids
-        // sorting for PartitionUpdate (which inherit this method) and that is useful because
-        //  1) it can help with debugging and 2) we can't write after sorting but we want to
-        // be able to print an update while we build it (again for debugging)
-        Iterator<Row> iterator = createRowIterator(null, false);
-        while (iterator.hasNext())
-            sb.append("\n    ").append(iterator.next().toString(metadata, true));
-
-        return sb.toString();
-    }
-
-    protected void reverse()
-    {
-        for (int i = 0; i < rows / 2; i++)
-            swap(i, rows - 1 - i);
-    }
-
-    public Row getRow(Clustering clustering)
-    {
-        Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
-        // Note that for statics, this will never return null, this will return an empty row. However,
-        // it's more consistent for this method to return null if we don't really have a static row.
-        return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
-    }
-
-    /**
-     * Returns an iterator that iterators over the rows of this update in clustering order.
-     *
-     * @return an iterator over the rows of this update.
-     */
-    public Iterator<Row> iterator()
-    {
-        return createRowIterator(null, false);
-    }
-
-    public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, boolean reversed)
-    {
-        final RowIterator iter = createRowIterator(columns, reversed);
-        return new SearchIterator<Clustering, Row>()
-        {
-            public boolean hasNext()
-            {
-                return iter.hasNext();
-            }
-
-            public Row next(Clustering key)
-            {
-                if (key == Clustering.STATIC_CLUSTERING)
-                {
-                    if (columns.fetchedColumns().statics.isEmpty() || staticRow().isEmpty())
-                        return Rows.EMPTY_STATIC_ROW;
-
-                    return FilteringRow.columnsFilteringRow(columns).setTo(staticRow());
-                }
-
-                return iter.seekTo(key) ? iter.next() : null;
-            }
-        };
-    }
-
-    public UnfilteredRowIterator unfilteredIterator()
-    {
-        return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false);
-    }
-
-    public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed)
-    {
-        return slices.makeSliceIterator(sliceableUnfilteredIterator(columns, reversed));
-    }
-
-    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
-    {
-        return sliceableUnfilteredIterator(ColumnFilter.selection(columns()), false);
-    }
-
-    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(final ColumnFilter selection, final boolean reversed)
-    {
-        return new AbstractSliceableIterator(this, selection.fetchedColumns(), reversed)
-        {
-            private final RowIterator rowIterator = createRowIterator(selection, reversed);
-            private RowAndTombstoneMergeIterator mergeIterator = new RowAndTombstoneMergeIterator(metadata.comparator, reversed);
-
-            protected Unfiltered computeNext()
-            {
-                if (!mergeIterator.isSet())
-                    mergeIterator.setTo(rowIterator, deletionInfo.rangeIterator(reversed));
-
-                return mergeIterator.hasNext() ? mergeIterator.next() : endOfData();
-            }
-
-            public Iterator<Unfiltered> slice(Slice slice)
-            {
-                return mergeIterator.setTo(rowIterator.slice(slice), deletionInfo.rangeIterator(slice, reversed));
-            }
-        };
-    }
-
-    private RowIterator createRowIterator(ColumnFilter columns, boolean reversed)
-    {
-        return reversed ? new ReverseRowIterator(columns) : new ForwardRowIterator(columns);
-    }
-
-    /**
-     * An iterator over the rows of this partition that reuse the same row object.
-     */
-    private abstract class RowIterator extends UnmodifiableIterator<Row>
-    {
-        protected final InternalReusableClustering clustering = new InternalReusableClustering();
-        protected final InternalReusableRow reusableRow;
-        protected final FilteringRow filter;
-
-        protected int next;
-
-        protected RowIterator(final ColumnFilter columns)
-        {
-            this.reusableRow = new InternalReusableRow(clustering);
-            this.filter = columns == null ? null : FilteringRow.columnsFilteringRow(columns);
-        }
-
-        /*
-         * Move the iterator so that row {@code name} is returned next by {@code next} if that
-         * row exists. Otherwise the first row sorting after {@code name} will be returned.
-         * Returns whether {@code name} was found or not.
-         */
-        public abstract boolean seekTo(Clustering name);
-
-        public abstract Iterator<Row> slice(Slice slice);
-
-        protected Row setRowTo(int row)
-        {
-            reusableRow.setTo(row);
-            return filter == null ? reusableRow : filter.setTo(reusableRow);
-        }
-
-        /**
-         * Simple binary search.
-         */
-        protected int binarySearch(ClusteringPrefix name, int fromIndex, int toIndex)
-        {
-            int low = fromIndex;
-            int mid = toIndex;
-            int high = mid - 1;
-            int result = -1;
-            while (low <= high)
-            {
-                mid = (low + high) >> 1;
-                if ((result = metadata.comparator.compare(name, clustering.setTo(mid))) > 0)
-                    low = mid + 1;
-                else if (result == 0)
-                    return mid;
-                else
-                    high = mid - 1;
-            }
-            return -mid - (result < 0 ? 1 : 2);
-        }
-    }
-
-    private class ForwardRowIterator extends RowIterator
-    {
-        private ForwardRowIterator(ColumnFilter columns)
-        {
-            super(columns);
-            this.next = 0;
-        }
-
-        public boolean hasNext()
-        {
-            return next < rows;
-        }
-
-        public Row next()
-        {
-            return setRowTo(next++);
-        }
-
-        public boolean seekTo(Clustering name)
-        {
-            if (next >= rows)
-                return false;
-
-            int idx = binarySearch(name, next, rows);
-            next = idx >= 0 ? idx : -idx - 1;
-            return idx >= 0;
-        }
-
-        public Iterator<Row> slice(Slice slice)
-        {
-            int sidx = binarySearch(slice.start(), next, rows);
-            final int start = sidx >= 0 ? sidx : -sidx - 1;
-            if (start >= rows)
-                return Collections.emptyIterator();
-
-            int eidx = binarySearch(slice.end(), start, rows);
-            // The insertion point is the first element greater than slice.end(), so we want the previous index
-            final int end = eidx >= 0 ? eidx : -eidx - 2;
-
-            // Remember the end to speed up potential further slice search
-            next = end;
-
-            if (start > end)
-                return Collections.emptyIterator();
-
-            return new AbstractIterator<Row>()
-            {
-                private int i = start;
-
-                protected Row computeNext()
-                {
-                    if (i >= rows || i > end)
-                        return endOfData();
-
-                    return setRowTo(i++);
-                }
-            };
-        }
-    }
-
-    private class ReverseRowIterator extends RowIterator
-    {
-        private ReverseRowIterator(ColumnFilter columns)
-        {
-            super(columns);
-            this.next = rows - 1;
-        }
-
-        public boolean hasNext()
-        {
-            return next >= 0;
-        }
-
-        public Row next()
-        {
-            return setRowTo(next--);
-        }
-
-        public boolean seekTo(Clustering name)
-        {
-            // We only use that method with forward iterators.
-            throw new UnsupportedOperationException();
-        }
-
-        public Iterator<Row> slice(Slice slice)
-        {
-            int sidx = binarySearch(slice.end(), 0, next + 1);
-            // The insertion point is the first element greater than slice.end(), so we want the previous index
-            final int start = sidx >= 0 ? sidx : -sidx - 2;
-            if (start < 0)
-                return Collections.emptyIterator();
-
-            int eidx = binarySearch(slice.start(), 0, start + 1);
-            final int end = eidx >= 0 ? eidx : -eidx - 1;
-
-            // Remember the end to speed up potential further slice search
-            next = end;
-
-            if (start < end)
-                return Collections.emptyIterator();
-
-            return new AbstractIterator<Row>()
-            {
-                private int i = start;
-
-                protected Row computeNext()
-                {
-                    if (i < 0 || i < end)
-                        return endOfData();
-
-                    return setRowTo(i--);
-                }
-            };
-        }
-    }
-
-    /**
-     * A reusable view over the clustering of this partition.
-     */
-    protected class InternalReusableClustering extends Clustering
-    {
-        final int size = metadata.clusteringColumns().size();
-        private int base;
-
-        public int size()
-        {
-            return size;
-        }
-
-        public Clustering setTo(int row)
-        {
-            base = row * size;
-            return this;
-        }
-
-        public ByteBuffer get(int i)
-        {
-            return clusterings[base + i];
-        }
-
-        public ByteBuffer[] getRawValues()
-        {
-            ByteBuffer[] values = new ByteBuffer[size];
-            for (int i = 0; i < size; i++)
-                values[i] = get(i);
-            return values;
-        }
-    };
-
-    /**
-     * A reusable view over the rows of this partition.
-     */
-    protected class InternalReusableRow extends AbstractReusableRow
-    {
-        private final LivenessInfoArray.Cursor liveness = new LivenessInfoArray.Cursor();
-        private final DeletionTimeArray.Cursor deletion = new DeletionTimeArray.Cursor();
-        private final InternalReusableClustering clustering;
-
-        private int row;
-
-        public InternalReusableRow()
-        {
-            this(new InternalReusableClustering());
-        }
-
-        public InternalReusableRow(InternalReusableClustering clustering)
-        {
-            this.clustering = clustering;
-        }
-
-        protected RowDataBlock data()
-        {
-            return data;
-        }
-
-        public Row setTo(int row)
-        {
-            this.clustering.setTo(row);
-            this.liveness.setTo(livenessInfos, row);
-            this.deletion.setTo(deletions, row);
-            this.row = row;
-            return this;
-        }
-
-        protected int row()
-        {
-            return row;
-        }
-
-        public Clustering clustering()
-        {
-            return clustering;
-        }
-
-        public LivenessInfo primaryKeyLivenessInfo()
-        {
-            return liveness;
-        }
-
-        public DeletionTime deletion()
-        {
-            return deletion;
-        }
-    };
-
-    private static abstract class AbstractSliceableIterator extends AbstractUnfilteredRowIterator implements SliceableUnfilteredRowIterator
-    {
-        private AbstractSliceableIterator(AbstractPartitionData data, PartitionColumns columns, boolean isReverseOrder)
-        {
-            super(data.metadata, data.key, data.partitionLevelDeletion(), columns, data.staticRow(), isReverseOrder, data.stats());
-        }
-    }
-
-    /**
-     * A row writer to add rows to this partition.
-     */
-    protected class Writer extends RowDataBlock.Writer
-    {
-        private int clusteringBase;
-
-        private int simpleColumnsSetInRow;
-        private final Set<ColumnDefinition> complexColumnsSetInRow = new HashSet<>();
-
-        public Writer(boolean inOrderCells)
-        {
-            super(data, inOrderCells);
-        }
-
-        public void writeClusteringValue(ByteBuffer value)
-        {
-            ensureCapacity(row);
-            clusterings[clusteringBase++] = value;
-        }
-
-        public void writePartitionKeyLivenessInfo(LivenessInfo info)
-        {
-            ensureCapacity(row);
-            livenessInfos.set(row, info);
-            collectStats(info);
-        }
-
-        public void writeRowDeletion(DeletionTime deletion)
-        {
-            ensureCapacity(row);
-            if (!deletion.isLive())
-                deletions.set(row, deletion);
-
-            collectStats(deletion);
-        }
-
-        @Override
-        public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
-        {
-            ensureCapacity(row);
-            collectStats(info);
-
-            if (column.isComplex())
-                complexColumnsSetInRow.add(column);
-            else
-                ++simpleColumnsSetInRow;
-
-            super.writeCell(column, isCounter, value, info, path);
-        }
-
-        @Override
-        public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion)
-        {
-            ensureCapacity(row);
-            collectStats(complexDeletion);
-
-            super.writeComplexDeletion(c, complexDeletion);
-        }
-
-        @Override
-        public void endOfRow()
-        {
-            super.endOfRow();
-            ++rows;
-
-            statsCollector.updateColumnSetPerRow(simpleColumnsSetInRow + complexColumnsSetInRow.size());
-
-            simpleColumnsSetInRow = 0;
-            complexColumnsSetInRow.clear();
-        }
-
-        public int currentRow()
-        {
-            return row;
-        }
-
-        private void ensureCapacity(int rowToSet)
-        {
-            int originalCapacity = livenessInfos.size();
-            if (rowToSet < originalCapacity)
-                return;
-
-            int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet);
-
-            int clusteringSize = metadata.clusteringColumns().size();
-
-            clusterings = Arrays.copyOf(clusterings, newCapacity * clusteringSize);
-
-            livenessInfos.resize(newCapacity);
-            deletions.resize(newCapacity);
-        }
-
-        @Override
-        public Writer reset()
-        {
-            super.reset();
-            clusteringBase = 0;
-            simpleColumnsSetInRow = 0;
-            complexColumnsSetInRow.clear();
-            return this;
-        }
-    }
-
-    /**
-     * A range tombstone marker writer to add range tombstone markers to this partition.
-     */
-    protected class RangeTombstoneCollector implements RangeTombstoneMarker.Writer
-    {
-        private final boolean reversed;
-
-        private final ByteBuffer[] nextValues = new ByteBuffer[metadata().comparator.size()];
-        private int size;
-        private RangeTombstone.Bound.Kind nextKind;
-
-        private Slice.Bound openBound;
-        private DeletionTime openDeletion;
-
-        public RangeTombstoneCollector(boolean reversed)
-        {
-            this.reversed = reversed;
-        }
-
-        public void writeClusteringValue(ByteBuffer value)
-        {
-            nextValues[size++] = value;
-        }
-
-        public void writeBoundKind(RangeTombstone.Bound.Kind kind)
-        {
-            nextKind = kind;
-        }
-
-        private ByteBuffer[] getValues()
-        {
-            return Arrays.copyOfRange(nextValues, 0, size);
-        }
-
-        private void open(RangeTombstone.Bound.Kind kind, DeletionTime deletion)
-        {
-            openBound = Slice.Bound.create(kind, getValues());
-            openDeletion = deletion.takeAlias();
-        }
-
-        private void close(RangeTombstone.Bound.Kind kind, DeletionTime deletion)
-        {
-            assert deletion.equals(openDeletion) : "Expected " + openDeletion + " but was "  + deletion;
-            Slice.Bound closeBound = Slice.Bound.create(kind, getValues());
-            Slice slice = reversed
-                        ? Slice.make(closeBound, openBound)
-                        : Slice.make(openBound, closeBound);
-            addRangeTombstone(slice, openDeletion);
-        }
-
-        public void writeBoundDeletion(DeletionTime deletion)
-        {
-            assert !nextKind.isBoundary();
-            if (nextKind.isOpen(reversed))
-                open(nextKind, deletion);
-            else
-                close(nextKind, deletion);
-        }
-
-        public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion)
-        {
-            assert nextKind.isBoundary();
-            DeletionTime closeTime = reversed ? startDeletion : endDeletion;
-            DeletionTime openTime = reversed ? endDeletion : startDeletion;
-
-            close(nextKind.closeBoundOfBoundary(reversed), closeTime);
-            open(nextKind.openBoundOfBoundary(reversed), openTime);
-        }
-
-        public void endOfMarker()
-        {
-            clear();
-        }
-
-        private void addRangeTombstone(Slice deletionSlice, DeletionTime dt)
-        {
-            AbstractPartitionData.this.addRangeTombstone(deletionSlice, dt);
-        }
-
-        private void clear()
-        {
-            size = 0;
-            Arrays.fill(nextValues, null);
-            nextKind = null;
-        }
-
-        public void reset()
-        {
-            openBound = null;
-            openDeletion = null;
-            clear();
-        }
-    }
-}