You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/24 15:00:36 UTC

[1/2] cassandra git commit: Minor improvements to RowStats

Repository: cassandra
Updated Branches:
  refs/heads/trunk 8a9796902 -> c055ab997


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 8a83242..0b06405 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -18,27 +18,15 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.RowStats;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.CounterId;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * A SSTable writer that assumes rows are in (partitioner) sorted order.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index ee646aa..132a095 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -231,7 +231,7 @@ public class StreamReader
             return staticRow;
         }
 
-        public RowStats stats()
+        public EncodingStats stats()
         {
             return header.stats();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index 899fee7..e35e996 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -26,8 +26,8 @@ import java.util.Arrays;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.db.rows.RowStats;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.*;
@@ -166,7 +166,7 @@ public class PartitionTest
 
         RowUpdateBuilder.deleteRowAt(cfs.metadata, 10L, localDeletionTime, "key1", "c").applyUnsafe();
         ArrayBackedPartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key1").build());
-        RowStats stats = partition.stats();
+        EncodingStats stats = partition.stats();
         assertEquals(localDeletionTime, stats.minLocalDeletionTime);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 99aa5e0..1d91069 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -22,8 +22,7 @@ import java.io.File;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
@@ -45,7 +44,7 @@ public class RowIndexEntryTest extends CQLTester
         final RowIndexEntry simple = new RowIndexEntry(123);
 
         DataOutputBuffer buffer = new DataOutputBuffer();
-        SerializationHeader header = new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), RowStats.NO_STATS);
+        SerializationHeader header = new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS);
         RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(cfs.metadata, BigFormat.latestVersion, header);
 
         serializer.serialize(simple, buffer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index fcf2630..b18e67b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -32,7 +32,7 @@ import org.junit.Test;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
 import org.apache.cassandra.dht.Range;
@@ -158,7 +158,7 @@ public class AntiCompactionTest
         File dir = cfs.directories.getDirectoryForNewSSTables();
         String filename = cfs.getTempSSTablePath(dir);
 
-        try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), RowStats.NO_STATS)))
+        try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
         {
             for (int i = 0; i < count; i++)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
index 1e5c23f..e6d8cb0 100644
--- a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
@@ -356,7 +356,7 @@ public class RowAndDeletionMergeIteratorTest
                                                ColumnFilter.all(cfm),
                                                Rows.EMPTY_STATIC_ROW,
                                                reversed,
-                                               RowStats.NO_STATS,
+                                               EncodingStats.NO_STATS,
                                                rows,
                                                tombstones,
                                                true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
index e869c72..fd775b4 100644
--- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
@@ -32,15 +32,12 @@ import org.junit.Test;
 
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Slice.Bound;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.rows.Unfiltered.Kind;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.SearchIterator;
 
 public class UnfilteredRowIteratorsMergeTest
 {
@@ -412,7 +409,7 @@ public class UnfilteredRowIteratorsMergeTest
                   UnfilteredRowIteratorsMergeTest.metadata.partitionColumns(),
                   null,
                   reversed,
-                  RowStats.NO_STATS);
+                  EncodingStats.NO_STATS);
             this.content = content;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index 97de022..29875d5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
@@ -70,7 +70,7 @@ public class BigTableWriterTest extends AbstractTransactionalTest
 
         private TestableBTW(String file) throws IOException
         {
-            this(file, SSTableWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), RowStats.NO_STATS)));
+            this(file, SSTableWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
         }
 
         private TestableBTW(String file, SSTableWriter sw) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 0f822c1..579f981 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.CompactionController;
@@ -51,7 +51,6 @@ import org.apache.cassandra.db.compaction.CompactionIterator;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.SSTableSplitter;
 import org.apache.cassandra.db.partitions.ArrayBackedPartition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -922,7 +921,7 @@ public class SSTableRewriterTest extends SchemaLoader
             File dir = cfs.directories.getDirectoryForNewSSTables();
             String filename = cfs.getTempSSTablePath(dir);
 
-            try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), RowStats.NO_STATS)))
+            try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
             {
                 int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount;
                 for ( ; i < end ; i++)
@@ -990,7 +989,7 @@ public class SSTableRewriterTest extends SchemaLoader
     public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
     {
         String filename = cfs.getTempSSTablePath(directory);
-        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), RowStats.NO_STATS));
+        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS));
     }
 
     public static ByteBuffer random(int i, int size)


[2/2] cassandra git commit: Minor improvements to RowStats

Posted by sl...@apache.org.
Minor improvements to RowStats

patch by slebresne; reviewed by JoshuaMcKenzie for CASSANDRA-9828


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c055ab99
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c055ab99
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c055ab99

Branch: refs/heads/trunk
Commit: c055ab997ea66faccdb10eddd5241f909ff73408
Parents: 8a97969
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Jul 14 15:25:02 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Jul 24 14:59:57 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../org/apache/cassandra/db/DeletionInfo.java   |   4 +-
 .../org/apache/cassandra/db/LegacyLayout.java   |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  12 +-
 .../cassandra/db/MutableDeletionInfo.java       |   4 +-
 .../apache/cassandra/db/RangeTombstoneList.java |   3 +-
 .../cassandra/db/SerializationHeader.java       | 143 ++++++----
 .../columniterator/AbstractSSTableIterator.java |   4 +-
 .../columniterator/SSTableReversedIterator.java |   4 +-
 .../partitions/ArrayBackedCachedPartition.java  |   2 +-
 .../db/partitions/ArrayBackedPartition.java     |   6 +-
 .../db/partitions/AtomicBTreePartition.java     |  10 +-
 .../db/partitions/FilteredPartition.java        |   4 +-
 .../cassandra/db/partitions/Partition.java      |   2 +-
 .../db/partitions/PartitionUpdate.java          |  20 +-
 .../db/rows/AbstractUnfilteredRowIterator.java  |   9 +-
 .../apache/cassandra/db/rows/BufferCell.java    |  26 +-
 .../apache/cassandra/db/rows/EncodingStats.java | 272 +++++++++++++++++++
 .../LazilyInitializedUnfilteredRowIterator.java |   2 +-
 .../db/rows/RowAndDeletionMergeIterator.java    |   2 +-
 .../org/apache/cassandra/db/rows/RowStats.java  | 263 ------------------
 .../db/rows/UnfilteredRowIterator.java          |   2 +-
 .../rows/UnfilteredRowIteratorSerializer.java   |  33 +--
 .../db/rows/UnfilteredRowIterators.java         |   8 +-
 .../cassandra/db/rows/UnfilteredSerializer.java |  63 +++--
 .../db/rows/WrappingUnfilteredRowIterator.java  |   4 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |   8 +-
 .../io/sstable/SSTableIdentityIterator.java     |   4 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |   9 +-
 .../io/sstable/SSTableSimpleWriter.java         |  12 -
 .../cassandra/streaming/StreamReader.java       |   2 +-
 .../org/apache/cassandra/db/PartitionTest.java  |   4 +-
 .../apache/cassandra/db/RowIndexEntryTest.java  |   5 +-
 .../db/compaction/AntiCompactionTest.java       |   4 +-
 .../rows/RowAndDeletionMergeIteratorTest.java   |   2 +-
 .../rows/UnfilteredRowIteratorsMergeTest.java   |   5 +-
 .../io/sstable/BigTableWriterTest.java          |   4 +-
 .../io/sstable/SSTableRewriterTest.java         |   7 +-
 38 files changed, 472 insertions(+), 500 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e7cfa51..70b26f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,7 +5,7 @@
  * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384)
  * Cleanup crc and adler code for java 8 (CASSANDRA-9650)
  * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808, 9825, 9848,
-   9705, 9859, 9867, 9874)
+   9705, 9859, 9867, 9874, 9828, 9801)
  * Update Guava to 18.0 (CASSANDRA-9653)
  * Bloom filter false positive ratio is not honoured (CASSANDRA-8413)
  * New option for cassandra-stress to leave a ratio of columns null (CASSANDRA-9522)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 0b5df06..5bec812 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db;
 import java.util.Iterator;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
-import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
@@ -54,7 +54,7 @@ public interface DeletionInfo extends IMeasurableMemory
 
     public RangeTombstone rangeCovering(Clustering name);
 
-    public void collectStats(RowStats.Collector collector);
+    public void collectStats(EncodingStats.Collector collector);
 
     public int dataSize();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index dc13700..501dbb2 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -334,7 +334,7 @@ public abstract class LegacyLayout
                                                ColumnFilter.all(metadata),
                                                staticRow,
                                                reversed,
-                                               RowStats.NO_STATS,
+                                               EncodingStats.NO_STATS,
                                                rows,
                                                ranges,
                                                true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index cd5560e..71e03d5 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -423,7 +423,7 @@ public class Memtable implements Comparable<Memtable>
 
         public SSTableWriter createFlushWriter(String filename,
                                                PartitionColumns columns,
-                                               RowStats stats)
+                                               EncodingStats stats)
         {
             MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
             return SSTableWriter.create(Descriptor.fromFilename(filename),
@@ -503,20 +503,20 @@ public class Memtable implements Comparable<Memtable>
 
     private static class StatsCollector
     {
-        private final AtomicReference<RowStats> stats = new AtomicReference<>(RowStats.NO_STATS);
+        private final AtomicReference<EncodingStats> stats = new AtomicReference<>(EncodingStats.NO_STATS);
 
-        public void update(RowStats newStats)
+        public void update(EncodingStats newStats)
         {
             while (true)
             {
-                RowStats current = stats.get();
-                RowStats updated = current.mergeWith(newStats);
+                EncodingStats current = stats.get();
+                EncodingStats updated = current.mergeWith(newStats);
                 if (stats.compareAndSet(current, updated))
                     return;
             }
         }
 
-        public RowStats get()
+        public EncodingStats get()
         {
             return stats.get();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutableDeletionInfo.java b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
index 6b19283..d01b1d1 100644
--- a/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
@@ -23,7 +23,7 @@ import com.google.common.base.Objects;
 import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
@@ -250,7 +250,7 @@ public class MutableDeletionInfo implements DeletionInfo
         return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize());
     }
 
-    public void collectStats(RowStats.Collector collector)
+    public void collectStats(EncodingStats.Collector collector)
     {
         collector.update(partitionDeletion);
         if (ranges != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index 96bcdb1..59bebf7 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Data structure holding the range tombstones of a ColumnFamily.
@@ -308,7 +307,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         return max;
     }
 
-    public void collectStats(RowStats.Collector collector)
+    public void collectStats(EncodingStats.Collector collector)
     {
         for (int i = 0; i < size; i++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 5784260..c054b25 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -43,29 +43,23 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SerializationHeader
 {
-    private static final int DEFAULT_BASE_DELETION = computeDefaultBaseDeletion();
-
     public static final Serializer serializer = new Serializer();
 
     private final AbstractType<?> keyType;
     private final List<AbstractType<?>> clusteringTypes;
 
     private final PartitionColumns columns;
-    private final RowStats stats;
+    private final EncodingStats stats;
 
     private final Map<ByteBuffer, AbstractType<?>> typeMap;
 
-    private final long baseTimestamp;
-    public final int baseDeletionTime;
-    private final int baseTTL;
-
     // Whether or not to store cell in a sparse or dense way. See UnfilteredSerializer for details.
     private final boolean useSparseColumnLayout;
 
     private SerializationHeader(AbstractType<?> keyType,
                                 List<AbstractType<?>> clusteringTypes,
                                 PartitionColumns columns,
-                                RowStats stats,
+                                EncodingStats stats,
                                 Map<ByteBuffer, AbstractType<?>> typeMap)
     {
         this.keyType = keyType;
@@ -74,15 +68,6 @@ public class SerializationHeader
         this.stats = stats;
         this.typeMap = typeMap;
 
-        // Not that if a given stats is unset, it means that either it's unused (there is
-        // no tombstone whatsoever for instance) or that we have no information on it. In
-        // that former case, it doesn't matter which base we use but in the former, we use
-        // bases that are more likely to provide small encoded values than the default
-        // "unset" value.
-        this.baseTimestamp = stats.hasMinTimestamp() ? stats.minTimestamp : 0;
-        this.baseDeletionTime = stats.hasMinLocalDeletionTime() ? stats.minLocalDeletionTime : DEFAULT_BASE_DELETION;
-        this.baseTTL = stats.minTTL;
-
         // For the dense layout, we have a 1 byte overhead for absent columns. For the sparse layout, it's a 1
         // overhead for present columns (in fact we use a 2 byte id, but assuming vint encoding, we'll pay 2 bytes
         // only for the columns after the 128th one and for simplicity we assume that once you have that many column,
@@ -113,7 +98,7 @@ public class SerializationHeader
         return new SerializationHeader(BytesType.instance,
                                        clusteringTypes,
                                        PartitionColumns.NONE,
-                                       RowStats.NO_STATS,
+                                       EncodingStats.NO_STATS,
                                        Collections.<ByteBuffer, AbstractType<?>>emptyMap());
     }
 
@@ -121,7 +106,7 @@ public class SerializationHeader
     {
         // The serialization header has to be computed before the start of compaction (since it's used to write)
         // the result. This means that when compacting multiple sources, we won't have perfectly accurate stats
-        // (for RowStats) since compaction may delete, purge and generally merge rows in unknown ways. This is
+        // (for EncodingStats) since compaction may delete, purge and generally merge rows in unknown ways. This is
         // kind of ok because those stats are only used for optimizing the underlying storage format and so we
         // just have to strive for as good as possible. Currently, we stick to a relatively naive merge of existing
         // global stats because it's simple and probably good enough in most situation but we could probably
@@ -129,7 +114,7 @@ public class SerializationHeader
         // Note however that to avoid seeing our accuracy degrade through successive compactions, we don't base
         // our stats merging on the compacted files headers, which as we just said can be somewhat inaccurate,
         // but rather on their stats stored in StatsMetadata that are fully accurate.
-        RowStats.Collector stats = new RowStats.Collector();
+        EncodingStats.Collector stats = new EncodingStats.Collector();
         PartitionColumns.Builder columns = PartitionColumns.builder();
         for (SSTableReader sstable : sstables)
         {
@@ -147,7 +132,7 @@ public class SerializationHeader
 
     public SerializationHeader(CFMetaData metadata,
                                PartitionColumns columns,
-                               RowStats stats)
+                               EncodingStats stats)
     {
         this(metadata.getKeyValidator(),
              typesOf(metadata.clusteringColumns()),
@@ -171,23 +156,7 @@ public class SerializationHeader
         return !columns.statics.isEmpty();
     }
 
-    private static int computeDefaultBaseDeletion()
-    {
-        // We need a fixed default, but one that is likely to provide small values (close to 0) when
-        // substracted to deletion times. Since deletion times are 'the current time in seconds', we
-        // use as base Jan 1, 2015 (in seconds).
-        Calendar c = Calendar.getInstance(TimeZone.getTimeZone("GMT-0"), Locale.US);
-        c.set(Calendar.YEAR, 2015);
-        c.set(Calendar.MONTH, Calendar.JANUARY);
-        c.set(Calendar.DAY_OF_MONTH, 1);
-        c.set(Calendar.HOUR_OF_DAY, 0);
-        c.set(Calendar.MINUTE, 0);
-        c.set(Calendar.SECOND, 0);
-        c.set(Calendar.MILLISECOND, 0);
-        return (int)(c.getTimeInMillis() / 1000);
-    }
-
-    public RowStats stats()
+    public EncodingStats stats()
     {
         return stats;
     }
@@ -212,34 +181,89 @@ public class SerializationHeader
         return typeMap == null ? column.type : typeMap.get(column.name.bytes);
     }
 
-    public long encodeTimestamp(long timestamp)
+    public void writeTimestamp(long timestamp, DataOutputPlus out) throws IOException
+    {
+        out.writeVInt(timestamp - stats.minTimestamp);
+    }
+
+    public void writeLocalDeletionTime(int localDeletionTime, DataOutputPlus out) throws IOException
+    {
+        out.writeVInt(localDeletionTime - stats.minLocalDeletionTime);
+    }
+
+    public void writeTTL(int ttl, DataOutputPlus out) throws IOException
+    {
+        out.writeVInt(ttl - stats.minTTL);
+    }
+
+    public void writeDeletionTime(DeletionTime dt, DataOutputPlus out) throws IOException
+    {
+        writeTimestamp(dt.markedForDeleteAt(), out);
+        writeLocalDeletionTime(dt.localDeletionTime(), out);
+    }
+
+    public long readTimestamp(DataInputPlus in) throws IOException
+    {
+        return in.readVInt() + stats.minTimestamp;
+    }
+
+    public int readLocalDeletionTime(DataInputPlus in) throws IOException
+    {
+        return (int)in.readVInt() + stats.minLocalDeletionTime;
+    }
+
+    public int readTTL(DataInputPlus in) throws IOException
+    {
+        return (int)in.readVInt() + stats.minTTL;
+    }
+
+    public DeletionTime readDeletionTime(DataInputPlus in) throws IOException
+    {
+        long markedAt = readTimestamp(in);
+        int localDeletionTime = readLocalDeletionTime(in);
+        return new DeletionTime(markedAt, localDeletionTime);
+    }
+
+    public long timestampSerializedSize(long timestamp)
+    {
+        return TypeSizes.sizeofVInt(timestamp - stats.minTimestamp);
+    }
+
+    public long localDeletionTimeSerializedSize(int localDeletionTime)
+    {
+        return TypeSizes.sizeofVInt(localDeletionTime - stats.minLocalDeletionTime);
+    }
+
+    public long ttlSerializedSize(int ttl)
     {
-        return timestamp - baseTimestamp;
+        return TypeSizes.sizeofVInt(ttl - stats.minTTL);
     }
 
-    public long decodeTimestamp(long timestamp)
+    public long deletionTimeSerializedSize(DeletionTime dt)
     {
-        return baseTimestamp + timestamp;
+        return timestampSerializedSize(dt.markedForDeleteAt())
+             + localDeletionTimeSerializedSize(dt.localDeletionTime());
     }
 
-    public int encodeDeletionTime(int deletionTime)
+    public void skipTimestamp(DataInputPlus in) throws IOException
     {
-        return deletionTime - baseDeletionTime;
+        in.readVInt();
     }
 
-    public int decodeDeletionTime(int deletionTime)
+    public void skipLocalDeletionTime(DataInputPlus in) throws IOException
     {
-        return baseDeletionTime + deletionTime;
+        in.readVInt();
     }
 
-    public int encodeTTL(int ttl)
+    public void skipTTL(DataInputPlus in) throws IOException
     {
-        return ttl - baseTTL;
+        in.readVInt();
     }
 
-    public int decodeTTL(int ttl)
+    public void skipDeletionTime(DataInputPlus in) throws IOException
     {
-        return baseTTL + ttl;
+        skipTimestamp(in);
+        skipLocalDeletionTime(in);
     }
 
     public Component toComponent()
@@ -256,8 +280,7 @@ public class SerializationHeader
     @Override
     public String toString()
     {
-        return String.format("SerializationHeader[key=%s, cks=%s, columns=%s, stats=%s, typeMap=%s, baseTs=%d, baseDt=%s, baseTTL=%s]",
-                             keyType, clusteringTypes, columns, stats, typeMap, baseTimestamp, baseDeletionTime, baseTTL);
+        return String.format("SerializationHeader[key=%s, cks=%s, columns=%s, stats=%s, typeMap=%s]", keyType, clusteringTypes, columns, stats, typeMap);
     }
 
     /**
@@ -270,13 +293,13 @@ public class SerializationHeader
         private final List<AbstractType<?>> clusteringTypes;
         private final Map<ByteBuffer, AbstractType<?>> staticColumns;
         private final Map<ByteBuffer, AbstractType<?>> regularColumns;
-        private final RowStats stats;
+        private final EncodingStats stats;
 
         private Component(AbstractType<?> keyType,
                           List<AbstractType<?>> clusteringTypes,
                           Map<ByteBuffer, AbstractType<?>> staticColumns,
                           Map<ByteBuffer, AbstractType<?>> regularColumns,
-                          RowStats stats)
+                          EncodingStats stats)
         {
             this.keyType = keyType;
             this.clusteringTypes = clusteringTypes;
@@ -351,7 +374,7 @@ public class SerializationHeader
     {
         public void serializeForMessaging(SerializationHeader header, DataOutputPlus out, boolean hasStatic) throws IOException
         {
-            RowStats.serializer.serialize(header.stats, out);
+            EncodingStats.serializer.serialize(header.stats, out);
 
             if (hasStatic)
                 Columns.serializer.serialize(header.columns.statics, out);
@@ -360,7 +383,7 @@ public class SerializationHeader
 
         public SerializationHeader deserializeForMessaging(DataInputPlus in, CFMetaData metadata, boolean hasStatic) throws IOException
         {
-            RowStats stats = RowStats.serializer.deserialize(in);
+            EncodingStats stats = EncodingStats.serializer.deserialize(in);
 
             AbstractType<?> keyType = metadata.getKeyValidator();
             List<AbstractType<?>> clusteringTypes = typesOf(metadata.clusteringColumns());
@@ -373,7 +396,7 @@ public class SerializationHeader
 
         public long serializedSizeForMessaging(SerializationHeader header, boolean hasStatic)
         {
-            long size = RowStats.serializer.serializedSize(header.stats);
+            long size = EncodingStats.serializer.serializedSize(header.stats);
 
             if (hasStatic)
                 size += Columns.serializer.serializedSize(header.columns.statics);
@@ -384,7 +407,7 @@ public class SerializationHeader
         // For SSTables
         public void serialize(Component header, DataOutputPlus out) throws IOException
         {
-            RowStats.serializer.serialize(header.stats, out);
+            EncodingStats.serializer.serialize(header.stats, out);
 
             writeType(header.keyType, out);
             out.writeShort(header.clusteringTypes.size());
@@ -398,7 +421,7 @@ public class SerializationHeader
         // For SSTables
         public Component deserialize(Version version, DataInputPlus in) throws IOException
         {
-            RowStats stats = RowStats.serializer.deserialize(in);
+            EncodingStats stats = EncodingStats.serializer.deserialize(in);
 
             AbstractType<?> keyType = readType(in);
             int size = in.readUnsignedShort();
@@ -418,7 +441,7 @@ public class SerializationHeader
         // For SSTables
         public int serializedSize(Component header)
         {
-            int size = RowStats.serializer.serializedSize(header.stats);
+            int size = EncodingStats.serializer.serializedSize(header.stats);
 
             size += sizeofType(header.keyType);
             size += TypeSizes.sizeof((short)header.clusteringTypes.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index fe8d1f0..5e6165f 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -210,11 +210,11 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
         return staticRow;
     }
 
-    public RowStats stats()
+    public EncodingStats stats()
     {
         // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
         // SerializationHeader.make() for details) so we use the latter instead.
-        return new RowStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
     }
 
     public boolean hasNext()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 4082874..f4acd6f 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -331,9 +331,9 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
             return Rows.EMPTY_STATIC_ROW; // we don't actually use that
         }
 
-        public RowStats stats()
+        public EncodingStats stats()
         {
-            return RowStats.NO_STATS; // we don't actually use that
+            return EncodingStats.NO_STATS; // we don't actually use that
         }
 
         public void add(Unfiltered unfiltered)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
index e2ec06d..a3c8768 100644
--- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
@@ -45,7 +45,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
                                        Row staticRow,
                                        List<Row> rows,
                                        DeletionInfo deletionInfo,
-                                       RowStats stats,
+                                       EncodingStats stats,
                                        int createdAtInSec,
                                        int cachedLiveRows,
                                        int rowsWithNonExpiringCells,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
index 4485117..79c65dc 100644
--- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
@@ -27,7 +27,7 @@ public class ArrayBackedPartition extends AbstractThreadUnsafePartition
 {
     private final Row staticRow;
     private final DeletionInfo deletionInfo;
-    private final RowStats stats;
+    private final EncodingStats stats;
 
     protected ArrayBackedPartition(CFMetaData metadata,
                                    DecoratedKey partitionKey,
@@ -35,7 +35,7 @@ public class ArrayBackedPartition extends AbstractThreadUnsafePartition
                                    Row staticRow,
                                    List<Row> rows,
                                    DeletionInfo deletionInfo,
-                                   RowStats stats)
+                                   EncodingStats stats)
     {
         super(metadata, partitionKey, columns, rows);
         this.staticRow = staticRow;
@@ -107,7 +107,7 @@ public class ArrayBackedPartition extends AbstractThreadUnsafePartition
         return deletionInfo;
     }
 
-    public RowStats stats()
+    public EncodingStats stats()
     {
         return stats;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 1361422..c06ffd5 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -86,7 +86,7 @@ public class AtomicBTreePartition implements Partition
 
     private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, "wasteTracker");
 
-    private static final Holder EMPTY = new Holder(BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, RowStats.NO_STATS);
+    private static final Holder EMPTY = new Holder(BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
 
     private final CFMetaData metadata;
     private final DecoratedKey partitionKey;
@@ -135,7 +135,7 @@ public class AtomicBTreePartition implements Partition
         return !BTree.isEmpty(ref.tree);
     }
 
-    public RowStats stats()
+    public EncodingStats stats()
     {
         return ref.stats;
     }
@@ -338,7 +338,7 @@ public class AtomicBTreePartition implements Partition
                               ? current.staticRow
                               : (current.staticRow.isEmpty() ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
                 Object[] tree = BTree.update(current.tree, update.metadata().comparator, update, update.rowCount(), updater);
-                RowStats newStats = current.stats.mergeWith(update.stats());
+                EncodingStats newStats = current.stats.mergeWith(update.stats());
 
                 if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo, staticRow, newStats)))
                 {
@@ -422,9 +422,9 @@ public class AtomicBTreePartition implements Partition
         // the btree of rows
         final Object[] tree;
         final Row staticRow;
-        final RowStats stats;
+        final EncodingStats stats;
 
-        Holder(Object[] tree, DeletionInfo deletionInfo, Row staticRow, RowStats stats)
+        Holder(Object[] tree, DeletionInfo deletionInfo, Row staticRow, EncodingStats stats)
         {
             this.tree = tree;
             this.deletionInfo = deletionInfo;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
index 1cac274..3a57d1a 100644
--- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
@@ -79,9 +79,9 @@ public class FilteredPartition extends AbstractThreadUnsafePartition
         return DeletionInfo.LIVE;
     }
 
-    public RowStats stats()
+    public EncodingStats stats()
     {
-        return RowStats.NO_STATS;
+        return EncodingStats.NO_STATS;
     }
 
     public RowIterator rowIterator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/partitions/Partition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java b/src/java/org/apache/cassandra/db/partitions/Partition.java
index 71d0411..04568e9 100644
--- a/src/java/org/apache/cassandra/db/partitions/Partition.java
+++ b/src/java/org/apache/cassandra/db/partitions/Partition.java
@@ -40,7 +40,7 @@ public interface Partition
 
     public PartitionColumns columns();
 
-    public RowStats stats();
+    public EncodingStats stats();
 
     /**
      * Whether the partition object has no informations at all, including any deletion informations.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index c9788e6..689b832 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -70,7 +70,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     private boolean canReOpen = true;
 
     private final MutableDeletionInfo deletionInfo;
-    private RowStats stats; // will be null if isn't built
+    private EncodingStats stats; // will be null if isn't built
 
     private Row staticRow = Rows.EMPTY_STATIC_ROW;
 
@@ -82,7 +82,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                             Row staticRow,
                             List<Row> rows,
                             MutableDeletionInfo deletionInfo,
-                            RowStats stats,
+                            EncodingStats stats,
                             boolean isBuilt,
                             boolean canHaveShadowedData)
     {
@@ -112,7 +112,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
      */
     public static PartitionUpdate emptyUpdate(CFMetaData metadata, DecoratedKey key)
     {
-        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), MutableDeletionInfo.live(), RowStats.NO_STATS, true, false);
+        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), MutableDeletionInfo.live(), EncodingStats.NO_STATS, true, false);
     }
 
     /**
@@ -127,7 +127,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
      */
     public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, DecoratedKey key, long timestamp, int nowInSec)
     {
-        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), new MutableDeletionInfo(timestamp, nowInSec), RowStats.NO_STATS, true, false);
+        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), new MutableDeletionInfo(timestamp, nowInSec), EncodingStats.NO_STATS, true, false);
     }
 
     /**
@@ -142,8 +142,8 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     public static PartitionUpdate singleRowUpdate(CFMetaData metadata, DecoratedKey key, Row row)
     {
         return row.isStatic()
-             ? new PartitionUpdate(metadata, key, new PartitionColumns(row.columns(), Columns.NONE), row, Collections.<Row>emptyList(), MutableDeletionInfo.live(), RowStats.NO_STATS, true, false)
-             : new PartitionUpdate(metadata, key, new PartitionColumns(Columns.NONE, row.columns()), Rows.EMPTY_STATIC_ROW, Collections.singletonList(row), MutableDeletionInfo.live(), RowStats.NO_STATS, true, false);
+             ? new PartitionUpdate(metadata, key, new PartitionColumns(row.columns(), Columns.NONE), row, Collections.<Row>emptyList(), MutableDeletionInfo.live(), EncodingStats.NO_STATS, true, false)
+             : new PartitionUpdate(metadata, key, new PartitionColumns(Columns.NONE, row.columns()), Rows.EMPTY_STATIC_ROW, Collections.singletonList(row), MutableDeletionInfo.live(), EncodingStats.NO_STATS, true, false);
     }
 
     /**
@@ -182,7 +182,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
 
         List<Row> rows = new ArrayList<>();
 
-        RowStats.Collector collector = new RowStats.Collector();
+        EncodingStats.Collector collector = new EncodingStats.Collector();
 
         while (iterator.hasNext())
         {
@@ -285,7 +285,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
         MutableDeletionInfo deletion = MutableDeletionInfo.live();
         Row staticRow = Rows.EMPTY_STATIC_ROW;
         List<Iterator<Row>> updateRowIterators = new ArrayList<>(size);
-        RowStats stats = RowStats.NO_STATS;
+        EncodingStats stats = EncodingStats.NO_STATS;
 
         for (PartitionUpdate update : updates)
         {
@@ -412,7 +412,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
         return super.rowCount();
     }
 
-    public RowStats stats()
+    public EncodingStats stats()
     {
         maybeBuild();
         return stats;
@@ -649,7 +649,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
 
     private void finishBuild()
     {
-        RowStats.Collector collector = new RowStats.Collector();
+        EncodingStats.Collector collector = new EncodingStats.Collector();
         deletionInfo.collectStats(collector);
         for (Row row : rows)
             Rows.collectStats(row, collector);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
index b4f849a..1d95c97 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
@@ -17,10 +17,7 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.util.Objects;
-
 import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
@@ -33,7 +30,7 @@ public abstract class AbstractUnfilteredRowIterator extends AbstractIterator<Unf
     protected final PartitionColumns columns;
     protected final Row staticRow;
     protected final boolean isReverseOrder;
-    protected final RowStats stats;
+    protected final EncodingStats stats;
 
     protected AbstractUnfilteredRowIterator(CFMetaData metadata,
                                             DecoratedKey partitionKey,
@@ -41,7 +38,7 @@ public abstract class AbstractUnfilteredRowIterator extends AbstractIterator<Unf
                                             PartitionColumns columns,
                                             Row staticRow,
                                             boolean isReverseOrder,
-                                            RowStats stats)
+                                            EncodingStats stats)
     {
         this.metadata = metadata;
         this.partitionKey = partitionKey;
@@ -82,7 +79,7 @@ public abstract class AbstractUnfilteredRowIterator extends AbstractIterator<Unf
         return staticRow;
     }
 
-    public RowStats stats()
+    public EncodingStats stats()
     {
         return stats;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index c339092..e952748 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -263,12 +263,12 @@ public class BufferCell extends AbstractCell
             out.writeByte((byte)flags);
 
             if (!useRowTimestamp)
-                out.writeVInt(header.encodeTimestamp(cell.timestamp()));
+                header.writeTimestamp(cell.timestamp(), out);
 
             if ((isDeleted || isExpiring) && !useRowTTL)
-                out.writeVInt(header.encodeDeletionTime(cell.localDeletionTime()));
+                header.writeLocalDeletionTime(cell.localDeletionTime(), out);
             if (isExpiring && !useRowTTL)
-                out.writeVInt(header.encodeTTL(cell.ttl()));
+                header.writeTTL(cell.ttl(), out);
 
             if (cell.column().isComplex())
                 cell.column().cellPathSerializer().serialize(cell.path(), out);
@@ -289,15 +289,13 @@ public class BufferCell extends AbstractCell
             boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
             boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
 
-            long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.decodeTimestamp(in.readVInt());
+            long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in);
 
             int localDeletionTime = useRowTTL
                                   ? rowLiveness.localExpirationTime()
-                                  : (isDeleted || isExpiring ? header.decodeDeletionTime((int)in.readVInt()) : NO_DELETION_TIME);
+                                  : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME);
 
-            int ttl = useRowTTL
-                    ? rowLiveness.ttl()
-                    : (isExpiring ? header.decodeTTL((int)in.readVInt()) : NO_TTL);
+            int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL);
 
             CellPath path = column.isComplex()
                           ? column.cellPathSerializer().deserialize(in)
@@ -337,12 +335,12 @@ public class BufferCell extends AbstractCell
             boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
 
             if (!useRowTimestamp)
-                size += TypeSizes.sizeofVInt(header.encodeTimestamp(cell.timestamp()));
+                size += header.timestampSerializedSize(cell.timestamp());
 
             if ((isDeleted || isExpiring) && !useRowTTL)
-                size += TypeSizes.sizeofVInt(header.encodeDeletionTime(cell.localDeletionTime()));
+                size += header.localDeletionTimeSerializedSize(cell.localDeletionTime());
             if (isExpiring && !useRowTTL)
-                size += TypeSizes.sizeofVInt(header.encodeTTL(cell.ttl()));
+                size += header.ttlSerializedSize(cell.ttl());
 
             if (cell.column().isComplex())
                 size += cell.column().cellPathSerializer().serializedSize(cell.path());
@@ -367,13 +365,13 @@ public class BufferCell extends AbstractCell
             boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
 
             if (!useRowTimestamp)
-                in.readVInt();
+                header.skipTimestamp(in);
 
             if (!useRowTTL && (isDeleted || isExpiring))
-                in.readVInt();
+                header.skipLocalDeletionTime(in);
 
             if (!useRowTTL && isExpiring)
-                in.readVInt();
+                header.skipTTL(in);
 
             if (column.isComplex())
                 column.cellPathSerializer().skip(in);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/EncodingStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
new file mode 100644
index 0000000..ca62c47
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Stats used for the encoding of the rows and tombstones of a given source.
+ * <p>
+ * Those stats are used to optimize the on-wire and on-disk storage of rows. More precisely,
+ * the {@code minTimestamp}, {@code minLocalDeletionTime} and {@code minTTL} stats are used to
+ * delta-encode those information for the sake of vint encoding. And {@code avgColumnSetPerRow}
+ * is used to decide if cells should be stored in a sparse or dense way (see {@link UnfilteredSerializer}).
+ * <p>
+ * Note that due to their use, those stats can suffer to be somewhat inaccurate (the more incurrate
+ * they are, the less effective the storage will be, but provided the stats are not completly wacky,
+ * this shouldn't have too huge an impact on performance) and in fact they will not always be
+ * accurate for reasons explained in {@link SerializationHeader#make}.
+ */
+public class EncodingStats
+{
+    // Default values for the timestamp, deletion time and ttl. We use this both for NO_STATS, but also to serialize
+    // an EncodingStats. Basically, we encode the diff of each value of to these epoch, which give values with better vint encoding.
+    private static final long TIMESTAMP_EPOCH;
+    private static final int DELETION_TIME_EPOCH;
+    private static final int TTL_EPOCH = 0;
+    static
+    {
+        // We want a fixed epoch, but that provide small values when substracted from our timestamp and deletion time.
+        // So we somewhat arbitrary use the date of the summit 2015, which should hopefully roughly correspond to 3.0 release.
+        Calendar c = Calendar.getInstance(TimeZone.getTimeZone("GMT-0"), Locale.US);
+        c.set(Calendar.YEAR, 2015);
+        c.set(Calendar.MONTH, Calendar.SEPTEMBER);
+        c.set(Calendar.DAY_OF_MONTH, 22);
+        c.set(Calendar.HOUR_OF_DAY, 0);
+        c.set(Calendar.MINUTE, 0);
+        c.set(Calendar.SECOND, 0);
+        c.set(Calendar.MILLISECOND, 0);
+
+        TIMESTAMP_EPOCH = c.getTimeInMillis() * 1000; // timestamps should be in microseconds by convention
+        DELETION_TIME_EPOCH = (int)(c.getTimeInMillis() / 1000); // local deletion times are in seconds
+    }
+
+    // We should use this sparingly obviously
+    public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH, -1);
+
+    public static final Serializer serializer = new Serializer();
+
+    public final long minTimestamp;
+    public final int minLocalDeletionTime;
+    public final int minTTL;
+
+    // Will be < 0 if the value is unknown
+    public final int avgColumnSetPerRow;
+
+    public EncodingStats(long minTimestamp,
+                         int minLocalDeletionTime,
+                         int minTTL,
+                         int avgColumnSetPerRow)
+    {
+        // Note that the exact value of those don't impact correctness, just the efficiency of the encoding. So when we
+        // get a value for timestamp (resp. minLocalDeletionTime) that means 'no object had a timestamp' (resp. 'a local
+        // deletion time'), then what value we store for minTimestamp (resp. minLocalDeletionTime) doesn't matter, and
+        // it's thus more efficient to use our EPOCH numbers, since it will result in a guaranteed 1 byte encoding.
+
+        this.minTimestamp = minTimestamp == LivenessInfo.NO_TIMESTAMP ? TIMESTAMP_EPOCH : minTimestamp;
+        this.minLocalDeletionTime = minLocalDeletionTime == LivenessInfo.NO_EXPIRATION_TIME ? DELETION_TIME_EPOCH : minLocalDeletionTime;
+        this.minTTL = minTTL;
+        this.avgColumnSetPerRow = avgColumnSetPerRow;
+    }
+
+    /**
+     * Merge this stats with another one.
+     * <p>
+     * The comments of {@link SerializationHeader#make} applies here too, i.e. the result of
+     * merging will be not totally accurate but we can live with that.
+     */
+    public EncodingStats mergeWith(EncodingStats that)
+    {
+        long minTimestamp = this.minTimestamp == TIMESTAMP_EPOCH
+                          ? that.minTimestamp
+                          : (that.minTimestamp == TIMESTAMP_EPOCH ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp));
+
+        int minDelTime = this.minLocalDeletionTime == DELETION_TIME_EPOCH
+                       ? that.minLocalDeletionTime
+                       : (that.minLocalDeletionTime == DELETION_TIME_EPOCH ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime));
+
+        int minTTL = this.minTTL == TTL_EPOCH
+                   ? that.minTTL
+                   : (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL));
+
+        int avgColumnSetPerRow = this.avgColumnSetPerRow < 0
+                               ? that.avgColumnSetPerRow
+                               : (that.avgColumnSetPerRow < 0 ? this.avgColumnSetPerRow : (this.avgColumnSetPerRow + that.avgColumnSetPerRow) / 2);
+
+        return new EncodingStats(minTimestamp, minDelTime, minTTL, avgColumnSetPerRow);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        EncodingStats that = (EncodingStats) o;
+
+        return this.avgColumnSetPerRow == that.avgColumnSetPerRow
+            && this.minLocalDeletionTime == that.minLocalDeletionTime
+            && this.minTTL == that.minTTL
+            && this.minTimestamp == that.minTimestamp;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("EncodingStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+    }
+
+    public static class Collector implements PartitionStatisticsCollector
+    {
+        private boolean isTimestampSet;
+        private long minTimestamp = Long.MAX_VALUE;
+
+        private boolean isDelTimeSet;
+        private int minDeletionTime = Integer.MAX_VALUE;
+
+        private boolean isTTLSet;
+        private int minTTL = Integer.MAX_VALUE;
+
+        private boolean isColumnSetPerRowSet;
+        private long totalColumnsSet;
+        private long rows;
+
+        public void update(LivenessInfo info)
+        {
+            if (info.isEmpty())
+                return;
+
+            updateTimestamp(info.timestamp());
+
+            if (info.isExpiring())
+            {
+                updateTTL(info.ttl());
+                updateLocalDeletionTime(info.localExpirationTime());
+            }
+        }
+
+        public void update(Cell cell)
+        {
+            updateTimestamp(cell.timestamp());
+            if (cell.isExpiring())
+            {
+                updateTTL(cell.ttl());
+                updateLocalDeletionTime(cell.localDeletionTime());
+            }
+            else if (cell.isTombstone())
+            {
+                updateLocalDeletionTime(cell.localDeletionTime());
+            }
+        }
+
+        public void update(DeletionTime deletionTime)
+        {
+            if (deletionTime.isLive())
+                return;
+
+            updateTimestamp(deletionTime.markedForDeleteAt());
+            updateLocalDeletionTime(deletionTime.localDeletionTime());
+        }
+
+        public void updateTimestamp(long timestamp)
+        {
+            isTimestampSet = true;
+            minTimestamp = Math.min(minTimestamp, timestamp);
+        }
+
+        public void updateLocalDeletionTime(int deletionTime)
+        {
+            isDelTimeSet = true;
+            minDeletionTime = Math.min(minDeletionTime, deletionTime);
+        }
+
+        public void updateTTL(int ttl)
+        {
+            isTTLSet = true;
+            minTTL = Math.min(minTTL, ttl);
+        }
+
+        public void updateColumnSetPerRow(long columnSetInRow)
+        {
+            updateColumnSetPerRow(columnSetInRow, 1);
+        }
+
+        public void updateColumnSetPerRow(long totalColumnsSet, long rows)
+        {
+            if (totalColumnsSet < 0 || rows < 0)
+                return;
+
+            this.isColumnSetPerRowSet = true;
+            this.totalColumnsSet += totalColumnsSet;
+            this.rows += rows;
+        }
+
+        public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
+        {
+            // We don't care about this but this come with PartitionStatisticsCollector
+        }
+
+        public EncodingStats get()
+        {
+            return new EncodingStats(isTimestampSet ? minTimestamp : TIMESTAMP_EPOCH,
+                                     isDelTimeSet ? minDeletionTime : DELETION_TIME_EPOCH,
+                                     isTTLSet ? minTTL : TTL_EPOCH,
+                                     isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
+        }
+    }
+
+    public static class Serializer
+    {
+        public void serialize(EncodingStats stats, DataOutputPlus out) throws IOException
+        {
+            out.writeVInt(stats.minTimestamp - TIMESTAMP_EPOCH);
+            out.writeVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH);
+            out.writeVInt(stats.minTTL - TTL_EPOCH);
+            out.writeVInt(stats.avgColumnSetPerRow);
+        }
+
+        public int serializedSize(EncodingStats stats)
+        {
+            return TypeSizes.sizeofVInt(stats.minTimestamp - TIMESTAMP_EPOCH)
+                 + TypeSizes.sizeofVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH)
+                 + TypeSizes.sizeofVInt(stats.minTTL - TTL_EPOCH)
+                 + TypeSizes.sizeofVInt(stats.avgColumnSetPerRow);
+        }
+
+        public EncodingStats deserialize(DataInputPlus in) throws IOException
+        {
+            long minTimestamp = in.readVInt() + TIMESTAMP_EPOCH;
+            int minLocalDeletionTime = (int)in.readVInt() + DELETION_TIME_EPOCH;
+            int minTTL = (int)in.readVInt() + TTL_EPOCH;
+            int avgColumnSetPerRow = (int)in.readVInt();
+            return new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
index 6241a89..8d81898 100644
--- a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
@@ -83,7 +83,7 @@ public abstract class LazilyInitializedUnfilteredRowIterator extends AbstractIte
         return iterator.staticRow();
     }
 
-    public RowStats stats()
+    public EncodingStats stats()
     {
         maybeInit();
         return iterator.stats();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
index 2a10199..389fe45 100644
--- a/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
@@ -57,7 +57,7 @@ public class RowAndDeletionMergeIterator extends AbstractUnfilteredRowIterator
                                        ColumnFilter selection,
                                        Row staticRow,
                                        boolean isReversed,
-                                       RowStats stats,
+                                       EncodingStats stats,
                                        Iterator<Row> rows,
                                        Iterator<RangeTombstone> ranges,
                                        boolean removeShadowedData)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/RowStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowStats.java b/src/java/org/apache/cassandra/db/rows/RowStats.java
deleted file mode 100644
index 5b0d3bd..0000000
--- a/src/java/org/apache/cassandra/db/rows/RowStats.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.io.IOException;
-import java.util.Objects;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-
-import static org.apache.cassandra.db.LivenessInfo.NO_TIMESTAMP;
-import static org.apache.cassandra.db.LivenessInfo.NO_TTL;
-import static org.apache.cassandra.db.LivenessInfo.NO_EXPIRATION_TIME;
-
-/**
- * General statistics on rows (and and tombstones) for a given source.
- * <p>
- * Those stats are used to optimize the on-wire and on-disk storage of rows. More precisely,
- * the {@code minTimestamp}, {@code minLocalDeletionTime} and {@code minTTL} stats are used to
- * delta-encode those information for the sake of vint encoding. And {@code avgColumnSetPerRow}
- * is used to decide if cells should be stored in a sparse or dense way (see {@link UnfilteredSerializer}).
- * <p>
- * Note that due to their use, those stats can suffer to be somewhat inaccurate (the more incurrate
- * they are, the less effective the storage will be, but provided the stats are not completly wacky,
- * this shouldn't have too huge an impact on performance) and in fact they will not always be
- * accurate for reasons explained in {@link SerializationHeader#make}.
- */
-public class RowStats
-{
-    // We should use this sparingly obviously
-    public static final RowStats NO_STATS = new RowStats(NO_TIMESTAMP, NO_EXPIRATION_TIME, NO_TTL, -1);
-
-    public static final Serializer serializer = new Serializer();
-
-    public final long minTimestamp;
-    public final int minLocalDeletionTime;
-    public final int minTTL;
-
-    // Will be < 0 if the value is unknown
-    public final int avgColumnSetPerRow;
-
-    public RowStats(long minTimestamp,
-                    int minLocalDeletionTime,
-                    int minTTL,
-                    int avgColumnSetPerRow)
-    {
-        this.minTimestamp = minTimestamp;
-        this.minLocalDeletionTime = minLocalDeletionTime;
-        this.minTTL = minTTL;
-        this.avgColumnSetPerRow = avgColumnSetPerRow;
-    }
-
-    public boolean hasMinTimestamp()
-    {
-        return minTimestamp != NO_TIMESTAMP;
-    }
-
-    public boolean hasMinLocalDeletionTime()
-    {
-        return minLocalDeletionTime != NO_EXPIRATION_TIME;
-    }
-
-    /**
-     * Merge this stats with another one.
-     * <p>
-     * The comments of {@link SerializationHeader#make} applies here too, i.e. the result of
-     * merging will be not totally accurate but we can live with that.
-     */
-    public RowStats mergeWith(RowStats that)
-    {
-        long minTimestamp = this.minTimestamp == NO_TIMESTAMP
-                          ? that.minTimestamp
-                          : (that.minTimestamp == NO_TIMESTAMP ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp));
-
-        int minDelTime = this.minLocalDeletionTime == NO_EXPIRATION_TIME
-                       ? that.minLocalDeletionTime
-                       : (that.minLocalDeletionTime == NO_EXPIRATION_TIME ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime));
-
-        int minTTL = this.minTTL == NO_TTL
-                   ? that.minTTL
-                   : (that.minTTL == NO_TTL ? this.minTTL : Math.min(this.minTTL, that.minTTL));
-
-        int avgColumnSetPerRow = this.avgColumnSetPerRow < 0
-                               ? that.avgColumnSetPerRow
-                               : (that.avgColumnSetPerRow < 0 ? this.avgColumnSetPerRow : (this.avgColumnSetPerRow + that.avgColumnSetPerRow) / 2);
-
-        return new RowStats(minTimestamp, minDelTime, minTTL, avgColumnSetPerRow);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        RowStats rowStats = (RowStats) o;
-
-        if (avgColumnSetPerRow != rowStats.avgColumnSetPerRow) return false;
-        if (minLocalDeletionTime != rowStats.minLocalDeletionTime) return false;
-        if (minTTL != rowStats.minTTL) return false;
-        if (minTimestamp != rowStats.minTimestamp) return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
-    }
-
-    @Override
-    public String toString()
-    {
-        return String.format("RowStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
-    }
-
-    public static class Collector implements PartitionStatisticsCollector
-    {
-        private boolean isTimestampSet;
-        private long minTimestamp = Long.MAX_VALUE;
-
-        private boolean isDelTimeSet;
-        private int minDeletionTime = Integer.MAX_VALUE;
-
-        private boolean isTTLSet;
-        private int minTTL = Integer.MAX_VALUE;
-
-        private boolean isColumnSetPerRowSet;
-        private long totalColumnsSet;
-        private long rows;
-
-        public void update(LivenessInfo info)
-        {
-            if (info.isEmpty())
-                return;
-
-            updateTimestamp(info.timestamp());
-
-            if (info.isExpiring())
-            {
-                updateTTL(info.ttl());
-                updateLocalDeletionTime(info.localExpirationTime());
-            }
-        }
-
-        public void update(Cell cell)
-        {
-            updateTimestamp(cell.timestamp());
-            updateTTL(cell.ttl());
-            updateLocalDeletionTime(cell.localDeletionTime());
-        }
-
-        public void updateTimestamp(long timestamp)
-        {
-            if (timestamp == NO_TIMESTAMP)
-                return;
-
-            isTimestampSet = true;
-            minTimestamp = Math.min(minTimestamp, timestamp);
-        }
-
-        public void updateLocalDeletionTime(int deletionTime)
-        {
-            if (deletionTime == NO_EXPIRATION_TIME)
-                return;
-
-            isDelTimeSet = true;
-            minDeletionTime = Math.min(minDeletionTime, deletionTime);
-        }
-
-        public void update(DeletionTime deletionTime)
-        {
-            if (deletionTime.isLive())
-                return;
-
-            updateTimestamp(deletionTime.markedForDeleteAt());
-            updateLocalDeletionTime(deletionTime.localDeletionTime());
-        }
-
-        public void updateTTL(int ttl)
-        {
-            if (ttl <= NO_TTL)
-                return;
-
-            isTTLSet = true;
-            minTTL = Math.min(minTTL, ttl);
-        }
-
-        public void updateColumnSetPerRow(long columnSetInRow)
-        {
-            updateColumnSetPerRow(columnSetInRow, 1);
-        }
-
-        public void updateColumnSetPerRow(long totalColumnsSet, long rows)
-        {
-            if (totalColumnsSet < 0 || rows < 0)
-                return;
-
-            this.isColumnSetPerRowSet = true;
-            this.totalColumnsSet += totalColumnsSet;
-            this.rows += rows;
-        }
-
-        public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
-        {
-            // We don't care about this but this come with PartitionStatisticsCollector
-        }
-
-        public RowStats get()
-        {
-            return new RowStats(isTimestampSet ? minTimestamp : NO_TIMESTAMP,
-                                isDelTimeSet ? minDeletionTime : NO_EXPIRATION_TIME,
-                                isTTLSet ? minTTL : NO_TTL,
-                                isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
-        }
-    }
-
-    public static class Serializer
-    {
-        public void serialize(RowStats stats, DataOutputPlus out) throws IOException
-        {
-            out.writeVInt(stats.minTimestamp);
-            out.writeVInt(stats.minLocalDeletionTime);
-            out.writeVInt(stats.minTTL);
-            out.writeVInt(stats.avgColumnSetPerRow);
-        }
-
-        public int serializedSize(RowStats stats)
-        {
-            return TypeSizes.sizeofVInt(stats.minTimestamp)
-                 + TypeSizes.sizeofVInt(stats.minLocalDeletionTime)
-                 + TypeSizes.sizeofVInt(stats.minTTL)
-                 + TypeSizes.sizeofVInt(stats.avgColumnSetPerRow);
-        }
-
-        public RowStats deserialize(DataInputPlus in) throws IOException
-        {
-            long minTimestamp = in.readVInt();
-            int minLocalDeletionTime = (int)in.readVInt();
-            int minTTL = (int)in.readVInt();
-            int avgColumnSetPerRow = (int)in.readVInt();
-            return new RowStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
index a3ecf6d..649fd8b 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
@@ -86,7 +86,7 @@ public interface UnfilteredRowIterator extends Iterator<Unfiltered>, AutoCloseab
      * performance reasons (for delta-encoding for instance) and code should not
      * expect those to be exact.
      */
-    public RowStats stats();
+    public EncodingStats stats();
 
     public void close();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index c998964..ec46751 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -117,7 +117,7 @@ public class UnfilteredRowIteratorSerializer
         SerializationHeader.serializer.serializeForMessaging(header, out, hasStatic);
 
         if (!partitionDeletion.isLive())
-            writeDelTime(partitionDeletion, header, out);
+            header.writeDeletionTime(partitionDeletion, out);
 
         if (hasStatic)
             UnfilteredSerializer.serializer.serialize(staticRow, header, out, version);
@@ -153,7 +153,7 @@ public class UnfilteredRowIteratorSerializer
         size += SerializationHeader.serializer.serializedSizeForMessaging(header, hasStatic);
 
         if (!partitionDeletion.isLive())
-            size += delTimeSerializedSize(partitionDeletion, header);
+            size += header.deletionTimeSerializedSize(partitionDeletion);
 
         if (hasStatic)
             size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version);
@@ -175,7 +175,7 @@ public class UnfilteredRowIteratorSerializer
         boolean isReversed = (flags & IS_REVERSED) != 0;
         if ((flags & IS_EMPTY) != 0)
         {
-            SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE, RowStats.NO_STATS);
+            SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE, EncodingStats.NO_STATS);
             return new Header(sh, key, isReversed, true, null, null, 0);
         }
 
@@ -185,7 +185,7 @@ public class UnfilteredRowIteratorSerializer
 
         SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, hasStatic);
 
-        DeletionTime partitionDeletion = hasPartitionDeletion ? readDelTime(in, header) : DeletionTime.LIVE;
+        DeletionTime partitionDeletion = hasPartitionDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE;
 
         Row staticRow = Rows.EMPTY_STATIC_ROW;
         if (hasStatic)
@@ -226,31 +226,6 @@ public class UnfilteredRowIteratorSerializer
         return deserialize(in, version, metadata, flag, deserializeHeader(in, version, metadata, flag));
     }
 
-    public static void writeDelTime(DeletionTime dt, SerializationHeader header, DataOutputPlus out) throws IOException
-    {
-        out.writeVInt(header.encodeTimestamp(dt.markedForDeleteAt()));
-        out.writeVInt(header.encodeDeletionTime(dt.localDeletionTime()));
-    }
-
-    public static long delTimeSerializedSize(DeletionTime dt, SerializationHeader header)
-    {
-        return TypeSizes.sizeofVInt(header.encodeTimestamp(dt.markedForDeleteAt()))
-             + TypeSizes.sizeofVInt(header.encodeDeletionTime(dt.localDeletionTime()));
-    }
-
-    public static DeletionTime readDelTime(DataInputPlus in, SerializationHeader header) throws IOException
-    {
-        long markedAt = header.decodeTimestamp(in.readVInt());
-        int localDelTime = header.decodeDeletionTime((int)in.readVInt());
-        return new DeletionTime(markedAt, localDelTime);
-    }
-
-    public static void skipDelTime(DataInputPlus in, SerializationHeader header) throws IOException
-    {
-        in.readVInt();
-        in.readVInt();
-    }
-
     public static class Header
     {
         public final SerializationHeader sHeader;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 6b6ec67..5f110bb 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -126,9 +126,9 @@ public abstract class UnfilteredRowIterators
                 return Rows.EMPTY_STATIC_ROW;
             }
 
-            public RowStats stats()
+            public EncodingStats stats()
             {
-                return RowStats.NO_STATS;
+                return EncodingStats.NO_STATS;
             }
 
             public boolean hasNext()
@@ -475,9 +475,9 @@ public abstract class UnfilteredRowIterators
                  : new PartitionColumns(statics, regulars);
         }
 
-        private static RowStats mergeStats(List<UnfilteredRowIterator> iterators)
+        private static EncodingStats mergeStats(List<UnfilteredRowIterator> iterators)
         {
-            RowStats stats = RowStats.NO_STATS;
+            EncodingStats stats = EncodingStats.NO_STATS;
             for (UnfilteredRowIterator iter : iterators)
                 stats = stats.mergeWith(iter.stats());
             return stats;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 11fa800..358c841 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -125,14 +125,14 @@ public class UnfilteredSerializer
             Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes());
 
         if ((flags & HAS_TIMESTAMP) != 0)
-            out.writeLong(header.encodeTimestamp(pkLiveness.timestamp()));
+            header.writeTimestamp(pkLiveness.timestamp(), out);
         if ((flags & HAS_TTL) != 0)
         {
-            out.writeInt(header.encodeTTL(pkLiveness.ttl()));
-            out.writeInt(header.encodeDeletionTime(pkLiveness.localExpirationTime()));
+            header.writeTTL(pkLiveness.ttl(), out);
+            header.writeLocalDeletionTime(pkLiveness.localExpirationTime(), out);
         }
         if ((flags & HAS_DELETION) != 0)
-            UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, out);
+            header.writeDeletionTime(deletion, out);
 
         Columns columns = header.columns(isStatic);
         int simpleCount = columns.simpleColumnCount();
@@ -174,7 +174,7 @@ public class UnfilteredSerializer
         }
 
         if (hasComplexDeletion)
-            UnfilteredRowIteratorSerializer.writeDelTime(data == null ? DeletionTime.LIVE : data.complexDeletion(), header, out);
+            header.writeDeletionTime(data == null ? DeletionTime.LIVE : data.complexDeletion(), out);
 
         if (data != null)
         {
@@ -194,12 +194,12 @@ public class UnfilteredSerializer
         if (marker.isBoundary())
         {
             RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
-            UnfilteredRowIteratorSerializer.writeDelTime(bm.endDeletionTime(), header, out);
-            UnfilteredRowIteratorSerializer.writeDelTime(bm.startDeletionTime(), header, out);
+            header.writeDeletionTime(bm.endDeletionTime(), out);
+            header.writeDeletionTime(bm.startDeletionTime(), out);
         }
         else
         {
-            UnfilteredRowIteratorSerializer.writeDelTime(((RangeTombstoneBoundMarker)marker).deletionTime(), header, out);
+            header.writeDeletionTime(((RangeTombstoneBoundMarker)marker).deletionTime(), out);
         }
     }
 
@@ -223,14 +223,14 @@ public class UnfilteredSerializer
             size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
 
         if (!pkLiveness.isEmpty())
-            size += TypeSizes.sizeof(header.encodeTimestamp(pkLiveness.timestamp()));
+            size += header.timestampSerializedSize(pkLiveness.timestamp());
         if (pkLiveness.isExpiring())
         {
-            size += TypeSizes.sizeof(header.encodeTTL(pkLiveness.ttl()));
-            size += TypeSizes.sizeof(header.encodeDeletionTime(pkLiveness.localExpirationTime()));
+            size += header.ttlSerializedSize(pkLiveness.ttl());
+            size += header.localDeletionTimeSerializedSize(pkLiveness.localExpirationTime());
         }
         if (!deletion.isLive())
-            size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header);
+            size += header.deletionTimeSerializedSize(deletion);
 
         Columns columns = header.columns(isStatic);
         int simpleCount = columns.simpleColumnCount();
@@ -274,7 +274,7 @@ public class UnfilteredSerializer
         }
 
         if (hasComplexDeletion)
-            size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(data == null ? DeletionTime.LIVE : data.complexDeletion(), header);
+            size += header.deletionTimeSerializedSize(data == null ? DeletionTime.LIVE : data.complexDeletion());
 
         if (data != null)
         {
@@ -293,12 +293,12 @@ public class UnfilteredSerializer
         if (marker.isBoundary())
         {
             RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
-            size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(bm.endDeletionTime(), header);
-            size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(bm.startDeletionTime(), header);
+            size += header.deletionTimeSerializedSize(bm.endDeletionTime());
+            size += header.deletionTimeSerializedSize(bm.startDeletionTime());
         }
         else
         {
-           size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(((RangeTombstoneBoundMarker)marker).deletionTime(), header);
+           size += header.deletionTimeSerializedSize(((RangeTombstoneBoundMarker)marker).deletionTime());
         }
         return size;
     }
@@ -350,9 +350,9 @@ public class UnfilteredSerializer
     throws IOException
     {
         if (bound.isBoundary())
-            return new RangeTombstoneBoundaryMarker(bound, UnfilteredRowIteratorSerializer.readDelTime(in, header), UnfilteredRowIteratorSerializer.readDelTime(in, header));
+            return new RangeTombstoneBoundaryMarker(bound, header.readDeletionTime(in), header.readDeletionTime(in));
         else
-            return new RangeTombstoneBoundMarker(bound, UnfilteredRowIteratorSerializer.readDelTime(in, header));
+            return new RangeTombstoneBoundMarker(bound, header.readDeletionTime(in));
     }
 
     public Row deserializeRowBody(DataInputPlus in,
@@ -373,14 +373,14 @@ public class UnfilteredSerializer
             LivenessInfo rowLiveness = LivenessInfo.EMPTY;
             if (hasTimestamp)
             {
-                long timestamp = header.decodeTimestamp(in.readLong());
-                int ttl = hasTTL ? header.decodeTTL(in.readInt()) : LivenessInfo.NO_TTL;
-                int localDeletionTime = hasTTL ? header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_EXPIRATION_TIME;
+                long timestamp = header.readTimestamp(in);
+                int ttl = hasTTL ? header.readTTL(in) : LivenessInfo.NO_TTL;
+                int localDeletionTime = hasTTL ? header.readLocalDeletionTime(in) : LivenessInfo.NO_EXPIRATION_TIME;
                 rowLiveness = LivenessInfo.create(timestamp, ttl, localDeletionTime);
             }
 
             builder.addPrimaryKeyLivenessInfo(rowLiveness);
-            builder.addRowDeletion(hasDeletion ? UnfilteredRowIteratorSerializer.readDelTime(in, header) : DeletionTime.LIVE);
+            builder.addRowDeletion(hasDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE);
 
             Columns columns = header.columns(isStatic);
             if (header.useSparseColumnLayout(isStatic))
@@ -443,7 +443,7 @@ public class UnfilteredSerializer
             helper.startOfComplexColumn(column);
             if (hasComplexDeletion)
             {
-                DeletionTime complexDeletion = UnfilteredRowIteratorSerializer.readDelTime(in, header);
+                DeletionTime complexDeletion = header.readDeletionTime(in);
                 if (!helper.isDroppedComplexDeletion(complexDeletion))
                     builder.addComplexDeletion(column, complexDeletion);
             }
@@ -474,15 +474,14 @@ public class UnfilteredSerializer
         // Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have
         // the size we think due to VINT encoding
         if (hasTimestamp)
-            in.readLong();
+            header.skipTimestamp(in);
         if (hasTTL)
         {
-            // ttl and localDeletionTime
-            in.readInt();
-            in.readInt();
+            header.skipLocalDeletionTime(in);
+            header.skipTTL(in);
         }
         if (hasDeletion)
-            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+            header.skipDeletionTime(in);
 
         Columns columns = header.columns(isStatic);
         if (header.useSparseColumnLayout(isStatic))
@@ -522,12 +521,12 @@ public class UnfilteredSerializer
     {
         if (isBoundary)
         {
-            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
-            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+            header.skipDeletionTime(in);
+            header.skipDeletionTime(in);
         }
         else
         {
-            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+            header.skipDeletionTime(in);
         }
     }
 
@@ -535,7 +534,7 @@ public class UnfilteredSerializer
     throws IOException
     {
         if (hasComplexDeletion)
-            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+            header.skipDeletionTime(in);
 
         while (Cell.serializer.skip(in, column, header));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
index ff3f82c..84713eb 100644
--- a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.util.NoSuchElementException;
-
 import com.google.common.collect.UnmodifiableIterator;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -72,7 +70,7 @@ public abstract class WrappingUnfilteredRowIterator extends UnmodifiableIterator
         return wrapped.staticRow();
     }
 
-    public RowStats stats()
+    public EncodingStats stats()
     {
         return wrapped.stats();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index e7560c2..174e634 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -21,23 +21,19 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.Closeable;
-import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.base.Throwables;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -69,7 +65,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
         return SSTableWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
                                     0,
                                     ActiveRepairService.UNREPAIRED_SSTABLE,
-                                    new SerializationHeader(metadata, columns, RowStats.NO_STATS));
+                                    new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
     }
 
     private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index f1af85c..23224ce 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -123,11 +123,11 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implem
         return filename;
     }
 
-    public RowStats stats()
+    public EncodingStats stats()
     {
         // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
         // SerializationHeader.make() for details) so we use the latter instead.
-        return new RowStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
     }
 
     public int compareTo(SSTableIdentityIterator o)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c055ab99/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index ef3bde1..db6ed42 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
@@ -29,17 +28,13 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -70,7 +65,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
     {
         super(directory, metadata, partitioner, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
-        this.header = new SerializationHeader(metadata, columns, RowStats.NO_STATS);
+        this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS);
         diskWriter.start();
     }