You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/04/28 04:00:17 UTC

[1/2] git commit: Add varint encoding to Serializing Cache patch by Vijay; reviewed by jbellis,xedin for CASSANDRA-4138

Updated Branches:
  refs/heads/trunk c6f95117d -> cb25a8fcd


Add varint encoding to Serializing Cache
patch by Vijay; reviewed by jbellis,xedin for CASSANDRA-4138


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cb25a8fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cb25a8fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cb25a8fc

Branch: refs/heads/trunk
Commit: cb25a8fcdd9626c6b29a3b1a94dfda5fd0375928
Parents: c6f9511
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Fri Apr 27 18:56:41 2012 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Fri Apr 27 18:56:41 2012 -0700

----------------------------------------------------------------------
 .../org/apache/cassandra/cache/KeyCacheKey.java    |    4 +-
 .../org/apache/cassandra/cache/RowCacheKey.java    |    4 +-
 .../apache/cassandra/cache/SerializingCache.java   |   12 +-
 .../cassandra/cache/SerializingCacheProvider.java  |   14 +-
 src/java/org/apache/cassandra/db/Column.java       |   10 +-
 src/java/org/apache/cassandra/db/ColumnFamily.java |   23 +-
 .../cassandra/db/ColumnFamilySerializer.java       |   24 +-
 src/java/org/apache/cassandra/db/ColumnIndex.java  |   13 +-
 .../org/apache/cassandra/db/ColumnSerializer.java  |    4 +-
 .../org/apache/cassandra/db/CounterColumn.java     |    4 +-
 .../org/apache/cassandra/db/CounterMutation.java   |    3 +-
 src/java/org/apache/cassandra/db/DBConstants.java  |   27 --
 src/java/org/apache/cassandra/db/DBTypeSizes.java  |  106 +++++
 .../org/apache/cassandra/db/ExpiringColumn.java    |    4 +-
 src/java/org/apache/cassandra/db/IColumn.java      |    4 +-
 src/java/org/apache/cassandra/db/Memtable.java     |    2 +-
 .../org/apache/cassandra/db/RangeSliceReply.java   |    5 +-
 src/java/org/apache/cassandra/db/ReadResponse.java |   10 +-
 src/java/org/apache/cassandra/db/Row.java          |    3 +-
 .../org/apache/cassandra/db/RowIndexEntry.java     |    7 +-
 src/java/org/apache/cassandra/db/RowMutation.java  |   13 +-
 src/java/org/apache/cassandra/db/RowPosition.java  |   16 +-
 .../cassandra/db/SliceByNamesReadCommand.java      |   19 +-
 .../apache/cassandra/db/SliceFromReadCommand.java  |   22 +-
 src/java/org/apache/cassandra/db/SuperColumn.java  |   24 +-
 .../org/apache/cassandra/db/WriteResponse.java     |    9 +-
 .../cassandra/db/commitlog/ReplayPosition.java     |    3 +-
 .../db/compaction/LazilyCompactedRow.java          |    2 +-
 .../cassandra/db/context/CounterContext.java       |   10 +-
 .../org/apache/cassandra/db/filter/QueryPath.java  |   42 ++-
 src/java/org/apache/cassandra/dht/Token.java       |    3 +-
 src/java/org/apache/cassandra/io/ISerializer.java  |    4 +-
 .../apache/cassandra/io/sstable/IndexHelper.java   |    9 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java    |    4 +-
 .../apache/cassandra/io/sstable/SSTableWriter.java |    4 +-
 .../cassandra/io/util/AbstractDataInput.java       |    8 +-
 .../cassandra/io/util/AbstractDataOutput.java      |  301 +++++++++++++++
 .../cassandra/utils/BloomFilterSerializer.java     |   22 +-
 .../org/apache/cassandra/utils/ByteBufferUtil.java |    6 +-
 .../apache/cassandra/utils/EstimatedHistogram.java |    3 +-
 .../org/apache/cassandra/utils/FilterFactory.java  |    8 +-
 .../apache/cassandra/utils/StreamingHistogram.java |    4 +-
 .../utils/vint/EncodedDataInputStream.java         |  115 ++++++
 .../utils/vint/EncodedDataOutputStream.java        |   98 +++++
 test/unit/org/apache/cassandra/Util.java           |    5 +
 .../apache/cassandra/utils/EncodedStreamsTest.java |  160 ++++++++
 46 files changed, 1031 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/cache/KeyCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/KeyCacheKey.java b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
index df23928..830fcdb 100644
--- a/src/java/org/apache/cassandra/cache/KeyCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.Arrays;
 
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -52,7 +52,7 @@ public class KeyCacheKey implements CacheKey
 
     public int serializedSize()
     {
-        return key.length + DBConstants.INT_SIZE;
+        return DBTypeSizes.NATIVE.sizeof(key.length) + key.length;
     }
 
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java b/src/java/org/apache/cassandra/cache/RowCacheKey.java
index 2c15faf..07daeb2 100644
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -58,7 +58,7 @@ public class RowCacheKey implements CacheKey, Comparable<RowCacheKey>
 
     public int serializedSize()
     {
-        return key.length + DBConstants.INT_SIZE;
+        return key.length + DBTypeSizes.NATIVE.sizeof(key.length);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index 1dac92e..3b641e8 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.cache;
 
-import java.io.DataOutputStream;
 import java.io.IOError;
 import java.io.IOException;
 import java.util.Set;
@@ -27,9 +26,12 @@ import com.googlecode.concurrentlinkedhashmap.EvictionListener;
 import com.googlecode.concurrentlinkedhashmap.Weigher;
 import com.googlecode.concurrentlinkedhashmap.Weighers;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.MemoryInputStream;
 import org.apache.cassandra.io.util.MemoryOutputStream;
+import org.apache.cassandra.utils.vint.EncodedDataInputStream;
+import org.apache.cassandra.utils.vint.EncodedDataOutputStream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +42,8 @@ import org.slf4j.LoggerFactory;
 public class SerializingCache<K, V> implements ICache<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(SerializingCache.class);
+    private static final DBTypeSizes ENCODED_TYPE_SIZES = DBTypeSizes.VINT;
+
     private static final int DEFAULT_CONCURENCY_LEVEL = 64;
 
     private final ConcurrentLinkedHashMap<K, FreeableMemory> map;
@@ -82,7 +86,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
     {
         try
         {
-            return serializer.deserialize(new MemoryInputStream(mem));
+            return serializer.deserialize(new EncodedDataInputStream(new MemoryInputStream(mem)));
         }
         catch (IOException e)
         {
@@ -93,7 +97,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
 
     private FreeableMemory serialize(V value)
     {
-        long serializedSize = serializer.serializedSize(value);
+        long serializedSize = serializer.serializedSize(value, ENCODED_TYPE_SIZES);
         if (serializedSize > Integer.MAX_VALUE)
             throw new IllegalArgumentException("Unable to allocate " + serializedSize + " bytes");
 
@@ -109,7 +113,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
 
         try
         {
-            serializer.serialize(value, new DataOutputStream(new MemoryOutputStream(freeableMemory)));
+            serializer.serialize(value, new EncodedDataOutputStream(new MemoryOutputStream(freeableMemory)));
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
index d3e8744..4ddc819 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
@@ -23,7 +23,7 @@ import java.io.IOError;
 import java.io.IOException;
 
 import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.ISerializer;
 
 public class SerializingCacheProvider implements IRowCacheProvider
@@ -60,12 +60,14 @@ public class SerializingCacheProvider implements IRowCacheProvider
             return ColumnFamily.serializer.deserialize(in);
         }
 
-        public long serializedSize(IRowCacheEntry cf)
+        public long serializedSize(IRowCacheEntry cf, DBTypeSizes typeSizes)
         {
-            return DBConstants.BOOL_SIZE
-                   + (cf instanceof RowCacheSentinel
-                      ? DBConstants.INT_SIZE + DBConstants.LONG_SIZE
-                      : ColumnFamily.serializer().serializedSize((ColumnFamily) cf));
+            int size = typeSizes.sizeof(true);
+            if (cf instanceof RowCacheSentinel)
+                size += typeSizes.sizeof(((RowCacheSentinel) cf).sentinelId);
+            else
+                size += ColumnFamily.serializer().serializedSize((ColumnFamily) cf, typeSizes);
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index 5f57af5..a1d89d8 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -124,7 +124,7 @@ public class Column implements IColumn
         return timestamp;
     }
 
-    public int size()
+    public int size(DBTypeSizes typeSizes)
     {
         /*
          * Size of a column is =
@@ -134,16 +134,18 @@ public class Column implements IColumn
          * + 4 bytes which basically indicates the size of the byte array
          * + entire byte array.
         */
-        return DBConstants.SHORT_SIZE + name.remaining() + 1 + DBConstants.TIMESTAMP_SIZE + DBConstants.INT_SIZE + value.remaining();
+        int nameSize = name.remaining();
+        int valueSize = value.remaining();
+        return typeSizes.sizeof((short) nameSize) + nameSize + 1 + typeSizes.sizeof(timestamp) + typeSizes.sizeof(valueSize) + valueSize;
     }
 
     /*
      * This returns the size of the column when serialized.
      * @see com.facebook.infrastructure.db.IColumn#serializedSize()
     */
-    public int serializedSize()
+    public int serializedSize(DBTypeSizes typeSizes)
     {
-        return size();
+        return size(typeSizes);
     }
 
     public int serializationFlags()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index b00f168..1c5fd53 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import static org.apache.cassandra.db.DBConstants.*;
-
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 
@@ -256,12 +254,12 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
         return null;
     }
 
-    int size()
+    int size(DBTypeSizes typeSizes)
     {
         int size = 0;
         for (IColumn column : columns)
         {
-            size += column.size();
+            size += column.size(typeSizes);
         }
         return size;
     }
@@ -353,23 +351,6 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
         addAll(cf, allocator);
     }
 
-    public long serializedSize()
-    {
-        return BOOL_SIZE // nullness bool
-               + INT_SIZE // id
-               + serializedSizeForSSTable();
-    }
-
-    public long serializedSizeForSSTable()
-    {
-        int size = INT_SIZE // local deletion time
-                 + LONG_SIZE // client deletion time
-                 + INT_SIZE; // column count
-        for (IColumn column : columns)
-            size += column.serializedSize();
-        return size;
-    }
-
     /**
      * Goes over all columns and check the fields are valid (as far as we can
      * tell).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index 53b2183..927a8f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.ColumnStats;
 
 public class ColumnFamilySerializer implements ISerializer<ColumnFamily>
 {
@@ -140,8 +139,27 @@ public class ColumnFamilySerializer implements ISerializer<ColumnFamily>
         return cf;
     }
 
-    public long serializedSize(ColumnFamily cf)
+    public long serializedSize(ColumnFamily cf, DBTypeSizes type)
     {
-        return cf == null ? DBConstants.BOOL_SIZE : cf.serializedSize();
+        if (cf == null)
+        {
+            return type.sizeof(false);
+        }
+        else
+        {
+            return type.sizeof(true) +      /* nullness bool */
+                   type.sizeof(cf.id()) +   /* id */
+                   serializedSizeForSSTable(cf, type);
+        }
+    }
+
+    public long serializedSizeForSSTable(ColumnFamily cf, DBTypeSizes typeSizes)
+    {
+        int size = typeSizes.sizeof(cf.getLocalDeletionTime()) // local deletion time
+                 + typeSizes.sizeof(cf.getMarkedForDeleteAt()) // client deletion time
+                 + typeSizes.sizeof(cf.getColumnCount()); // column count
+        for (IColumn column : cf.columns)
+            size += column.serializedSize(typeSizes);
+        return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index b4c2f90..98a9a4a 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -73,10 +73,13 @@ public class ColumnIndex
          */
         private static long rowHeaderSize(ByteBuffer key)
         {
-            return DBConstants.SHORT_SIZE + key.remaining()     // Row key
-                 + DBConstants.LONG_SIZE                        // Row data size
-                 + DBConstants.INT_SIZE + DBConstants.LONG_SIZE // Deletion info
-                 + DBConstants.INT_SIZE;                        // Column count
+            DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
+            // TODO fix constantSize when changing the nativeconststs.
+            int keysize = key.remaining();
+            return typeSizes.sizeof((short) keysize) + keysize + // Row key
+                 + typeSizes.sizeof(0L)                        // Row data size
+                 + typeSizes.sizeof(0) + typeSizes.sizeof(0L) // Deletion info
+                 + typeSizes.sizeof(0);                        // Column count
         }
 
         /**
@@ -110,7 +113,7 @@ public class ColumnIndex
                 startPosition = endPosition;
             }
 
-            endPosition += column.serializedSize();
+            endPosition += column.serializedSize(DBTypeSizes.NATIVE);
 
             // if we hit the column index size that we have to index after, go ahead and index it.
             if (endPosition - startPosition >= DatabaseDescriptor.getColumnIndexSize())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index 53680e4..1be3eb2 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -121,9 +121,9 @@ public class ColumnSerializer implements IColumnSerializer
         }
     }
 
-    public long serializedSize(IColumn object)
+    public long serializedSize(IColumn object, DBTypeSizes type)
     {
-        return object.serializedSize();
+        return object.serializedSize(type);
     }
 
     private static class CorruptColumnException extends IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index 369e183..6390f25 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -93,13 +93,13 @@ public class CounterColumn extends Column
     }
 
     @Override
-    public int size()
+    public int size(DBTypeSizes typeSizes)
     {
         /*
          * A counter column adds to a Column :
          *  + 8 bytes for timestampOfLastDelete
          */
-        return super.size() + DBConstants.TIMESTAMP_SIZE;
+        return super.size(typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 7ee9d80..ccc0e0a 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -186,7 +186,8 @@ class CounterMutationSerializer implements IVersionedSerializer<CounterMutation>
 
     public long serializedSize(CounterMutation cm, int version)
     {
+        int tableSize = FBUtilities.encodedUTF8Length(cm.consistency().name());
         return RowMutation.serializer().serializedSize(cm.rowMutation(), version)
-               + DBConstants.SHORT_SIZE + FBUtilities.encodedUTF8Length(cm.consistency().name());
+               + DBTypeSizes.NATIVE.sizeof((short) tableSize) + tableSize;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/DBConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DBConstants.java b/src/java/org/apache/cassandra/db/DBConstants.java
deleted file mode 100644
index cc8245c..0000000
--- a/src/java/org/apache/cassandra/db/DBConstants.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-public class DBConstants
-{
-    public static final int BOOL_SIZE = 1;
-    public static final int INT_SIZE = 4;
-    public static final int LONG_SIZE = 8;
-    public static final int SHORT_SIZE = 2;
-    public static final int TIMESTAMP_SIZE = 8;
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/DBTypeSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DBTypeSizes.java b/src/java/org/apache/cassandra/db/DBTypeSizes.java
new file mode 100644
index 0000000..3bf982e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DBTypeSizes.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public abstract class DBTypeSizes
+{
+    public static final DBTypeSizes NATIVE = new NativeDBTypeSizes();
+    public static final DBTypeSizes VINT = new VIntEncodedDBTypeSizes();
+
+    private static final int BOOL_SIZE = 1;
+    private static final int SHORT_SIZE = 2;
+    private static final int INT_SIZE = 4;
+    private static final int LONG_SIZE = 8;
+
+    public abstract int sizeof(boolean value);
+    public abstract int sizeof(short value);
+    public abstract int sizeof(int value);
+    public abstract int sizeof(long value);
+
+    public static class NativeDBTypeSizes extends DBTypeSizes
+    {
+        public int sizeof(boolean value)
+        {
+            return BOOL_SIZE;
+        }
+
+        public int sizeof(short value)
+        {
+            return SHORT_SIZE;
+        }
+
+        public int sizeof(int value)
+        {
+            return INT_SIZE;
+        }
+
+        public int sizeof(long value)
+        {
+            return LONG_SIZE;
+        }
+    }
+
+    public static class VIntEncodedDBTypeSizes extends DBTypeSizes
+    {
+        private static final int BOOL_SIZE = 1;
+
+        public int sizeofVInt(long i)
+        {
+            if (i >= -112 && i <= 127)
+                return 1;
+
+            int size = 0;
+            int len = -112;
+            if (i < 0)
+            {
+                i ^= -1L; // take one's complement'
+                len = -120;
+            }
+            long tmp = i;
+            while (tmp != 0)
+            {
+                tmp = tmp >> 8;
+                len--;
+            }
+            size++;
+            len = (len < -120) ? -(len + 120) : -(len + 112);
+            size += len;
+            return size;
+        }
+
+        public int sizeof(long i)
+        {
+            return sizeofVInt(i);
+        }
+
+        public int sizeof(boolean i)
+        {
+            return BOOL_SIZE;
+        }
+
+        public int sizeof(short i)
+        {
+            return sizeofVInt(i);
+        }
+
+        public int sizeof(int i)
+        {
+            return sizeofVInt(i);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ExpiringColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
index 03e41d2..0272d5c 100644
--- a/src/java/org/apache/cassandra/db/ExpiringColumn.java
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -74,14 +74,14 @@ public class ExpiringColumn extends Column
     }
 
     @Override
-    public int size()
+    public int size(DBTypeSizes typeSizes)
     {
         /*
          * An expired column adds to a Column :
          *    4 bytes for the localExpirationTime
          *  + 4 bytes for the timeToLive
         */
-        return super.size() + DBConstants.INT_SIZE + DBConstants.INT_SIZE;
+        return super.size(typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/IColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IColumn.java b/src/java/org/apache/cassandra/db/IColumn.java
index eb2e270..18c2335 100644
--- a/src/java/org/apache/cassandra/db/IColumn.java
+++ b/src/java/org/apache/cassandra/db/IColumn.java
@@ -42,8 +42,8 @@ public interface IColumn
     public long mostRecentLiveChangeAt();
     public long mostRecentNonGCableChangeAt(int gcbefore);
     public ByteBuffer name();
-    public int size();
-    public int serializedSize();
+    public int size(DBTypeSizes typeSizes);
+    public int serializedSize(DBTypeSizes typeSizes);
     public int serializationFlags();
     public long timestamp();
     public ByteBuffer value();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 06d191f..3a3fc31 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -225,7 +225,7 @@ public class Memtable
 
     private void resolve(DecoratedKey key, ColumnFamily cf)
     {
-        currentThroughput.addAndGet(cf.size());
+        currentThroughput.addAndGet(cf.size(DBTypeSizes.NATIVE));
         currentOperations.addAndGet((cf.getColumnCount() == 0)
                                     ? cf.isMarkedForDelete() ? 1 : 0
                                     : cf.getColumnCount());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/RangeSliceReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceReply.java b/src/java/org/apache/cassandra/db/RangeSliceReply.java
index 8976f0e..c563c9c 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceReply.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceReply.java
@@ -40,12 +40,13 @@ public class RangeSliceReply
 
     public Message getReply(Message originalMessage) throws IOException
     {
-        int size = DBConstants.INT_SIZE;
+        int rowCount = rows.size();
+        int size = DBTypeSizes.NATIVE.sizeof(rowCount);
         for (Row row : rows)
             size += Row.serializer().serializedSize(row, originalMessage.getVersion());
 
         DataOutputBuffer buffer = new DataOutputBuffer(size);
-        buffer.writeInt(rows.size());
+        buffer.writeInt(rowCount);
         for (Row row : rows)
             Row.serializer().serialize(row, buffer, originalMessage.getVersion());
         assert buffer.getLength() == buffer.getData().length;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 9480380..77c9209 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -113,11 +113,11 @@ class ReadResponseSerializer implements IVersionedSerializer<ReadResponse>
 
     public long serializedSize(ReadResponse response, int version)
     {
-        int size = DBConstants.INT_SIZE;
-        size += DBConstants.BOOL_SIZE;
-        if (response.isDigestQuery())
-            size += response.digest().remaining();
-        else
+        DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
+        ByteBuffer buffer = response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        int size = typeSizes.sizeof(buffer.remaining());
+        size += typeSizes.sizeof(response.isDigestQuery());
+        if (!response.isDigestQuery())
             size += Row.serializer().serializedSize(response.row(), version);
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
index dc003f9..16e50d4 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -79,7 +79,8 @@ public class Row
 
         public long serializedSize(Row row, int version)
         {
-            return DBConstants.SHORT_SIZE + row.key.key.remaining() + ColumnFamily.serializer().serializedSize(row.cf);
+            int keySize = row.key.key.remaining();
+            return DBTypeSizes.NATIVE.sizeof((short) keySize) + keySize + ColumnFamily.serializer().serializedSize(row.cf, DBTypeSizes.NATIVE);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index aea3c2f..307ce8b 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -183,10 +183,11 @@ public class RowIndexEntry
 
         public int serializedSize()
         {
-            int size = DBConstants.LONG_SIZE + DBConstants.INT_SIZE; // deletion info
-            size += DBConstants.INT_SIZE; // number of entries
+            DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
+            int size = typeSizes.sizeof(deletionInfo.localDeletionTime) + typeSizes.sizeof(deletionInfo.markedForDeleteAt); // deletion info
+            size += typeSizes.sizeof(columnsIndex.size()); // number of entries
             for (IndexHelper.IndexInfo info : columnsIndex)
-                size += info.serializedSize();
+                size += info.serializedSize(typeSizes);
 
             return size + (int) FilterFactory.serializedSize(bloomFilter);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index fb84e39..f731b98 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -446,14 +446,17 @@ public class RowMutation implements IMutation, MessageProducer
 
         public long serializedSize(RowMutation rm, int version)
         {
-            int size = DBConstants.SHORT_SIZE + FBUtilities.encodedUTF8Length(rm.getTable());
-            size += DBConstants.SHORT_SIZE + rm.key().remaining();
+            DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
+            int tableSize = FBUtilities.encodedUTF8Length(rm.getTable());
+            int keySize = rm.key().remaining();
+            int size = typeSizes.sizeof((short) tableSize) + tableSize;
+            size += typeSizes.sizeof((short) keySize) + keySize;
 
-            size += DBConstants.INT_SIZE;
+            size += typeSizes.sizeof(rm.modifications.size());
             for (Map.Entry<Integer,ColumnFamily> entry : rm.modifications.entrySet())
             {
-                size += DBConstants.INT_SIZE;
-                size += entry.getValue().serializedSize();
+                size += typeSizes.sizeof(entry.getKey());
+                size += ColumnFamily.serializer.serializedSize(entry.getValue(), DBTypeSizes.NATIVE);
             }
 
             return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/RowPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowPosition.java b/src/java/org/apache/cassandra/db/RowPosition.java
index e7a45ca..fd80f6b 100644
--- a/src/java/org/apache/cassandra/db/RowPosition.java
+++ b/src/java/org/apache/cassandra/db/RowPosition.java
@@ -100,12 +100,20 @@ public abstract class RowPosition implements RingPosition<RowPosition>
             }
         }
 
-        public long serializedSize(RowPosition pos)
+        public long serializedSize(RowPosition pos, DBTypeSizes typeSizes)
         {
             Kind kind = pos.kind();
-            return DBConstants.BOOL_SIZE
-                + (kind == Kind.ROW_KEY ? DBConstants.SHORT_SIZE + ((DecoratedKey)pos).key.remaining()
-                                        : Token.serializer().serializedSize(pos.getToken()));
+            int size = 1; // 1 byte for enum
+            if (kind == Kind.ROW_KEY)
+            {
+                int keySize = ((DecoratedKey)pos).key.remaining();
+                size += (typeSizes.sizeof((short) keySize) + keySize);
+            }
+            else
+            {
+                Token.serializer().serializedSize(pos.getToken(), typeSizes);
+            }
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 13d4174..13d2995 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -110,16 +110,23 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
 
     public long serializedSize(ReadCommand cmd, int version)
     {
+        DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
         SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
-        int size = DBConstants.BOOL_SIZE;
-        size += DBConstants.SHORT_SIZE + FBUtilities.encodedUTF8Length(command.table);
-        size += DBConstants.SHORT_SIZE + command.key.remaining();
-        size += command.queryPath.serializedSize();
-        size += DBConstants.INT_SIZE;
+        int size = typeSizes.sizeof(command.isDigestQuery());
+        int tableSize = FBUtilities.encodedUTF8Length(command.table);
+        int keySize = command.key.remaining();
+
+        size += typeSizes.sizeof((short) tableSize) + tableSize;
+        size += typeSizes.sizeof(keySize) + keySize;
+        size += command.queryPath.serializedSize(typeSizes);
+        size += typeSizes.sizeof(command.columnNames.size());
         if (!command.columnNames.isEmpty())
         {
             for (ByteBuffer cName : command.columnNames)
-                size += DBConstants.SHORT_SIZE + cName.remaining();
+            {
+                int cNameSize = cName.remaining();
+                size += typeSizes.sizeof((short) cNameSize) + cNameSize;
+            }
         }
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 47f0b3a..6f6b809 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -184,15 +184,21 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
 
     public long serializedSize(ReadCommand cmd, int version)
     {
+        DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
         SliceFromReadCommand command = (SliceFromReadCommand) cmd;
-        int size = DBConstants.BOOL_SIZE;
-        size += DBConstants.SHORT_SIZE + FBUtilities.encodedUTF8Length(command.table);
-        size += DBConstants.SHORT_SIZE + command.key.remaining();
-        size += command.queryPath.serializedSize();
-        size += DBConstants.SHORT_SIZE + command.start.remaining();
-        size += DBConstants.SHORT_SIZE + command.finish.remaining();
-        size += DBConstants.BOOL_SIZE;
-        size += DBConstants.INT_SIZE;
+        int tableSize = FBUtilities.encodedUTF8Length(command.table);
+        int keySize = command.key.remaining();
+        int startSize = command.start.remaining();
+        int finishSize = command.finish.remaining();
+
+        int size = typeSizes.sizeof(cmd.isDigestQuery()); // boolean
+        size += typeSizes.sizeof((short) tableSize) + tableSize;
+        size += typeSizes.sizeof((short) keySize) + keySize;
+        size += command.queryPath.serializedSize(typeSizes);
+        size += typeSizes.sizeof((short) startSize) + startSize;
+        size += typeSizes.sizeof((short) finishSize) + finishSize;
+        size += typeSizes.sizeof(command.reversed);
+        size += typeSizes.sizeof(command.count);
         return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/SuperColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumn.java b/src/java/org/apache/cassandra/db/SuperColumn.java
index 70ccca7..30ab43a 100644
--- a/src/java/org/apache/cassandra/db/SuperColumn.java
+++ b/src/java/org/apache/cassandra/db/SuperColumn.java
@@ -102,12 +102,12 @@ public class SuperColumn extends AbstractColumnContainer implements IColumn
     /**
      * This calculates the exact size of the sub columns on the fly
      */
-    public int size()
+    public int size(DBTypeSizes typeSizes)
     {
         int size = 0;
         for (IColumn subColumn : getSubColumns())
         {
-            size += subColumn.serializedSize();
+            size += subColumn.serializedSize(typeSizes);
         }
         return size;
     }
@@ -116,13 +116,25 @@ public class SuperColumn extends AbstractColumnContainer implements IColumn
      * This returns the size of the super-column when serialized.
      * @see org.apache.cassandra.db.IColumn#serializedSize()
     */
-    public int serializedSize()
+    public int serializedSize(DBTypeSizes typeSizes)
     {
         /*
          * We need to keep the way we are calculating the column size in sync with the
          * way we are calculating the size for the column family serializer.
+         *
+         * 2 bytes for name size
+         * n bytes for the name
+         * 4 bytes for getLocalDeletionTime
+         * 8 bytes for getMarkedForDeleteAt
+         * 4 bytes for the subcolumns size
+         * size(constantSize) of subcolumns.
          */
-        return DBConstants.SHORT_SIZE + name.remaining() + DBConstants.INT_SIZE + DBConstants.LONG_SIZE + DBConstants.INT_SIZE + size();
+        int nameSize = name.remaining();
+        int subColumnsSize = size(typeSizes);
+        return typeSizes.sizeof((short) nameSize) + nameSize
+                + typeSizes.sizeof(getLocalDeletionTime())
+                + typeSizes.sizeof(getMarkedForDeleteAt())
+                + typeSizes.sizeof(subColumnsSize) + subColumnsSize;
     }
 
     public long timestamp()
@@ -400,8 +412,8 @@ class SuperColumnSerializer implements IColumnSerializer
         return superColumn;
     }
 
-    public long serializedSize(IColumn object)
+    public long serializedSize(IColumn object, DBTypeSizes typeSizes)
     {
-        return object.serializedSize();
+        return object.serializedSize(typeSizes);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/WriteResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java
index 2814038..f9bba85 100644
--- a/src/java/org/apache/cassandra/db/WriteResponse.java
+++ b/src/java/org/apache/cassandra/db/WriteResponse.java
@@ -93,9 +93,12 @@ public class WriteResponse
 
         public long serializedSize(WriteResponse response, int version)
         {
-            int size = DBConstants.SHORT_SIZE + FBUtilities.encodedUTF8Length(response.table());
-            size += DBConstants.SHORT_SIZE + response.key().remaining();
-            size += DBConstants.BOOL_SIZE;
+            DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
+            int utfSize = FBUtilities.encodedUTF8Length(response.table());
+            int keySize = response.key().remaining();
+            int size = typeSizes.sizeof((short) utfSize) + utfSize;
+            size += typeSizes.sizeof((short) keySize) + keySize;
+            size += typeSizes.sizeof(response.isSuccess());
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index 93b7283..b4f709e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -26,6 +26,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
 
@@ -129,7 +130,7 @@ public class ReplayPosition implements Comparable<ReplayPosition>
             return new ReplayPosition(dis.readLong(), dis.readInt());
         }
 
-        public long serializedSize(ReplayPosition object)
+        public long serializedSize(ReplayPosition object, DBTypeSizes typeSizes)
         {
             throw new UnsupportedOperationException();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 6eae79c..9990851 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -238,7 +238,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
             IColumn reduced = purged.iterator().next();
             container.clear();
 
-            serializedSize += reduced.serializedSize();
+            serializedSize += reduced.serializedSize(DBTypeSizes.NATIVE);
             columns++;
             maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp());
             int deletionTime = reduced.getLocalDeletionTime();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/context/CounterContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index 40c93b4..3f18351 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -70,10 +70,10 @@ import org.apache.cassandra.utils.NodeId;
  */
 public class CounterContext implements IContext
 {
-    private static final int HEADER_SIZE_LENGTH = DBConstants.SHORT_SIZE;
-    private static final int HEADER_ELT_LENGTH = DBConstants.SHORT_SIZE;
-    private static final int CLOCK_LENGTH = DBConstants.LONG_SIZE;
-    private static final int COUNT_LENGTH = DBConstants.LONG_SIZE;
+    private static final int HEADER_SIZE_LENGTH = DBTypeSizes.NATIVE.sizeof(Short.MAX_VALUE);
+    private static final int HEADER_ELT_LENGTH = DBTypeSizes.NATIVE.sizeof(Short.MAX_VALUE);
+    private static final int CLOCK_LENGTH = DBTypeSizes.NATIVE.sizeof(Long.MAX_VALUE);
+    private static final int COUNT_LENGTH = DBTypeSizes.NATIVE.sizeof(Long.MAX_VALUE);
     private static final int STEP_LENGTH = NodeId.LENGTH + CLOCK_LENGTH + COUNT_LENGTH;
 
     private static final Logger logger = LoggerFactory.getLogger(CounterContext.class);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/filter/QueryPath.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryPath.java b/src/java/org/apache/cassandra/db/filter/QueryPath.java
index 31b39d0..b47e9f1 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryPath.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryPath.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.filter;
 import java.io.*;
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.ColumnPath;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -94,11 +94,43 @@ public class QueryPath
                              cName.remaining() == 0 ? null : cName);
     }
 
-    public int serializedSize()
+    public int serializedSize(DBTypeSizes typeSizes)
     {
-        int size = DBConstants.SHORT_SIZE + (columnFamilyName == null ? 0 : FBUtilities.encodedUTF8Length(columnFamilyName));
-        size += DBConstants.SHORT_SIZE + (superColumnName == null ? 0 : superColumnName.remaining());
-        size += DBConstants.SHORT_SIZE + (columnName == null ? 0 : columnName.remaining());
+        int size = 0;
+
+        if (columnFamilyName == null)
+        {
+            size += typeSizes.sizeof((short) 0);
+        }
+        else
+        {
+            int cfNameSize = FBUtilities.encodedUTF8Length(columnFamilyName);
+            size += typeSizes.sizeof((short) cfNameSize);
+            size += cfNameSize;
+        }
+
+        if (superColumnName == null)
+        {
+            size += typeSizes.sizeof((short) 0);
+        }
+        else
+        {
+            int scNameSize = superColumnName.remaining();
+            size += typeSizes.sizeof((short) scNameSize);
+            size += scNameSize;
+        }
+
+        if (columnName == null)
+        {
+            size += typeSizes.sizeof((short) 0);
+        }
+        else
+        {
+            int cNameSize = columnName.remaining();
+            size += typeSizes.sizeof((short) cNameSize);
+            size += cNameSize;
+        }
+
         return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/dht/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index 045d8a2..bf80b3c 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.service.StorageService;
@@ -102,7 +103,7 @@ public abstract class Token<T> implements RingPosition<Token<T>>, Serializable
             return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes));
         }
 
-        public long serializedSize(Token object)
+        public long serializedSize(Token object, DBTypeSizes typeSizes)
         {
             throw new UnsupportedOperationException();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/ISerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ISerializer.java b/src/java/org/apache/cassandra/io/ISerializer.java
index fc67dec..08ec20f 100644
--- a/src/java/org/apache/cassandra/io/ISerializer.java
+++ b/src/java/org/apache/cassandra/io/ISerializer.java
@@ -21,6 +21,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.cassandra.db.DBTypeSizes;
+
 public interface ISerializer<T>
 {
     /**
@@ -39,5 +41,5 @@ public interface ISerializer<T>
      */
     public T deserialize(DataInput dis) throws IOException;
 
-    public long serializedSize(T t);
+    public long serializedSize(T t, DBTypeSizes type);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index 5c7d645..2965cbf 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileMark;
@@ -199,9 +200,13 @@ public class IndexHelper
             dos.writeLong(width);
         }
 
-        public int serializedSize()
+        public int serializedSize(DBTypeSizes typeSizes)
         {
-            return 2 + firstName.remaining() + 2 + lastName.remaining() + 8 + 8;
+            int firstNameSize = firstName.remaining();
+            int lastNameSize = lastName.remaining();
+            return typeSizes.sizeof((short) firstNameSize) + firstNameSize +
+                   typeSizes.sizeof((short) lastNameSize) + lastNameSize +
+                   typeSizes.sizeof(offset) + typeSizes.sizeof(width);
         }
 
         public static IndexInfo deserialize(DataInput dis) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 5c00f5d..b3f08e1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -88,7 +88,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
 
     protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
     {
-        currentSize += key.key.remaining() + columnFamily.serializedSize() * 1.2;
+        currentSize += key.key.remaining() + ColumnFamily.serializer.serializedSize(columnFamily, DBTypeSizes.NATIVE) * 1.2;
 
         if (currentSize > bufferSize)
             sync();
@@ -107,7 +107,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         {
             // We will reuse a CF that we have counted already. But because it will be easier to add the full size
             // of the CF in the next writeRow call than to find out the delta, we just remove the size until that next call
-            currentSize -= currentKey.key.remaining() + previous.serializedSize() * 1.2;
+            currentSize -= currentKey.key.remaining() + ColumnFamily.serializer.serializedSize(previous, DBTypeSizes.NATIVE) * 1.2;
         }
         return previous;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index d505151..522fb3b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.regex.Pattern;
 
@@ -49,6 +48,7 @@ public class SSTableWriter extends SSTable
     private DecoratedKey lastWrittenKey;
     private FileMark dataMark;
     private final SSTableMetadata.Collector sstableMetadataCollector;
+    private final DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
 
     public SSTableWriter(String filename, long keyCount) throws IOException
     {
@@ -175,7 +175,7 @@ public class SSTableWriter extends SSTable
         ColumnIndex index = new ColumnIndex.Builder(cf.getComparator(), decoratedKey.key, cf.getColumnCount()).build(cf);
 
         // write out row size + data
-        dataFile.stream.writeLong(cf.serializedSizeForSSTable());
+        dataFile.stream.writeLong(ColumnFamily.serializer().serializedSizeForSSTable(cf, typeSizes));
         ColumnFamily.serializer().serializeForSSTable(cf, dataFile.stream);
 
         afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
index b42eb81..b0766a4 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
@@ -187,7 +187,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
      * @throws IOException
      *             if this file is closed or another I/O error occurs.
      */
-    public final int readInt() throws IOException {
+    public int readInt() throws IOException {
         byte[] buffer = new byte[4];
         if (read(buffer, 0, buffer.length) != buffer.length) {
             throw new EOFException();
@@ -251,7 +251,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
      * @throws IOException
      *             if this file is closed or another I/O error occurs.
      */
-    public final long readLong() throws IOException {
+    public long readLong() throws IOException {
         byte[] buffer = new byte[8];
         int n = read(buffer, 0, buffer.length);
         if (n != buffer.length) {
@@ -276,7 +276,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
      * @throws IOException
      *             if this file is closed or another I/O error occurs.
      */
-    public final short readShort() throws IOException {
+    public short readShort() throws IOException {
         byte[] buffer = new byte[2];
         if (read(buffer, 0, buffer.length) != buffer.length) {
             throw new EOFException();
@@ -314,7 +314,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
      * @throws IOException
      *             if this file is closed or another I/O error occurs.
      */
-    public final int readUnsignedShort() throws IOException {
+    public int readUnsignedShort() throws IOException {
         byte[] buffer = new byte[2];
         if (read(buffer, 0, buffer.length) != buffer.length) {
             throw new EOFException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
new file mode 100644
index 0000000..be26094
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+
+public abstract class AbstractDataOutput extends OutputStream implements DataOutput
+{
+    /*
+    !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
+    */
+
+    /**
+     * Writes the entire contents of the byte array <code>buffer</code> to
+     * this RandomAccessFile starting at the current file pointer.
+     * 
+     * @param buffer
+     *            the buffer to be written.
+     * 
+     * @throws IOException
+     *             If an error occurs trying to write to this RandomAccessFile.
+     */
+    public void write(byte[] buffer) throws IOException {
+        write(buffer, 0, buffer.length);
+    }
+
+    /**
+     * Writes <code>count</code> bytes from the byte array <code>buffer</code>
+     * starting at <code>offset</code> to this RandomAccessFile starting at
+     * the current file pointer..
+     * 
+     * @param buffer
+     *            the bytes to be written
+     * @param offset
+     *            offset in buffer to get bytes
+     * @param count
+     *            number of bytes in buffer to write
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             RandomAccessFile.
+     * @throws IndexOutOfBoundsException
+     *             If offset or count are outside of bounds.
+     */
+    public abstract void write(byte[] buffer, int offset, int count) throws IOException;
+
+    /**
+     * Writes the specified byte <code>oneByte</code> to this RandomAccessFile
+     * starting at the current file pointer. Only the low order byte of
+     * <code>oneByte</code> is written.
+     * 
+     * @param oneByte
+     *            the byte to be written
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             RandomAccessFile.
+     */
+    public abstract void write(int oneByte) throws IOException;
+
+    /**
+     * Writes a boolean to this output stream.
+     * 
+     * @param val
+     *            the boolean value to write to the OutputStream
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public final void writeBoolean(boolean val) throws IOException {
+        write(val ? 1 : 0);
+    }
+
+    /**
+     * Writes a 8-bit byte to this output stream.
+     * 
+     * @param val
+     *            the byte value to write to the OutputStream
+     * 
+     * @throws java.io.IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public final void writeByte(int val) throws IOException {
+        write(val & 0xFF);
+    }
+
+    /**
+     * Writes the low order 8-bit bytes from a String to this output stream.
+     * 
+     * @param str
+     *            the String containing the bytes to write to the OutputStream
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public final void writeBytes(String str) throws IOException {
+        byte bytes[] = new byte[str.length()];
+        for (int index = 0; index < str.length(); index++) {
+            bytes[index] = (byte) (str.charAt(index) & 0xFF);
+        }
+        write(bytes);
+    }
+
+    /**
+     * Writes the specified 16-bit character to the OutputStream. Only the lower
+     * 2 bytes are written with the higher of the 2 bytes written first. This
+     * represents the Unicode value of val.
+     * 
+     * @param val
+     *            the character to be written
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public final void writeChar(int val) throws IOException {
+        byte[] buffer = new byte[2];
+        buffer[0] = (byte) (val >> 8);
+        buffer[1] = (byte) val;
+        write(buffer, 0, buffer.length);
+    }
+
+    /**
+     * Writes the specified 16-bit characters contained in str to the
+     * OutputStream. Only the lower 2 bytes of each character are written with
+     * the higher of the 2 bytes written first. This represents the Unicode
+     * value of each character in str.
+     * 
+     * @param str
+     *            the String whose characters are to be written.
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public final void writeChars(String str) throws IOException {
+        byte newBytes[] = new byte[str.length() * 2];
+        for (int index = 0; index < str.length(); index++) {
+            int newIndex = index == 0 ? index : index * 2;
+            newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF);
+            newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF);
+        }
+        write(newBytes);
+    }
+
+    /**
+     * Writes a 64-bit double to this output stream. The resulting output is the
+     * 8 bytes resulting from calling Double.doubleToLongBits().
+     * 
+     * @param val
+     *            the double to be written.
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public final void writeDouble(double val) throws IOException {
+        writeLong(Double.doubleToLongBits(val));
+    }
+
+    /**
+     * Writes a 32-bit float to this output stream. The resulting output is the
+     * 4 bytes resulting from calling Float.floatToIntBits().
+     * 
+     * @param val
+     *            the float to be written.
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public final void writeFloat(float val) throws IOException {
+        writeInt(Float.floatToIntBits(val));
+    }
+
+    /**
+     * Writes a 32-bit int to this output stream. The resulting output is the 4
+     * bytes, highest order first, of val.
+     * 
+     * @param val
+     *            the int to be written.
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public void writeInt(int val) throws IOException {
+        byte[] buffer = new byte[4];
+        buffer[0] = (byte) (val >> 24);
+        buffer[1] = (byte) (val >> 16);
+        buffer[2] = (byte) (val >> 8);
+        buffer[3] = (byte) val;
+        write(buffer, 0, buffer.length);
+    }
+
+    /**
+     * Writes a 64-bit long to this output stream. The resulting output is the 8
+     * bytes, highest order first, of val.
+     * 
+     * @param val
+     *            the long to be written.
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public void writeLong(long val) throws IOException {
+        byte[] buffer = new byte[8];
+        int t = (int) (val >> 32);
+        buffer[0] = (byte) (t >> 24);
+        buffer[1] = (byte) (t >> 16);
+        buffer[2] = (byte) (t >> 8);
+        buffer[3] = (byte) t;
+        buffer[4] = (byte) (val >> 24);
+        buffer[5] = (byte) (val >> 16);
+        buffer[6] = (byte) (val >> 8);
+        buffer[7] = (byte) val;
+        write(buffer, 0, buffer.length);
+    }
+
+    /**
+     * Writes the specified 16-bit short to the OutputStream. Only the lower 2
+     * bytes are written with the higher of the 2 bytes written first.
+     * 
+     * @param val
+     *            the short to be written
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public void writeShort(int val) throws IOException {
+        writeChar(val);
+    }
+
+    /**
+     * Writes the specified String out in UTF format.
+     * 
+     * @param str
+     *            the String to be written in UTF format.
+     * 
+     * @throws IOException
+     *             If an error occurs attempting to write to this
+     *             DataOutputStream.
+     */
+    public final void writeUTF(String str) throws IOException {
+        int utfCount = 0, length = str.length();
+        for (int i = 0; i < length; i++) {
+            int charValue = str.charAt(i);
+            if (charValue > 0 && charValue <= 127) {
+                utfCount++;
+            } else if (charValue <= 2047) {
+                utfCount += 2;
+            } else {
+                utfCount += 3;
+            }
+        }
+        if (utfCount > 65535) {
+            throw new UTFDataFormatException(); //$NON-NLS-1$
+        }
+        byte utfBytes[] = new byte[utfCount + 2];
+        int utfIndex = 2;
+        for (int i = 0; i < length; i++) {
+            int charValue = str.charAt(i);
+            if (charValue > 0 && charValue <= 127) {
+                utfBytes[utfIndex++] = (byte) charValue;
+            } else if (charValue <= 2047) {
+                utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6)));
+                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+            } else {
+                utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12)));
+                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6)));
+                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+            }
+        }
+        utfBytes[0] = (byte) (utfCount >> 8);
+        utfBytes[1] = (byte) utfCount;
+        write(utfBytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
index 4d3c4af..7eb7020 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
@@ -21,7 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.utils.obs.OpenBitSet;
 
@@ -72,10 +72,22 @@ abstract class BloomFilterSerializer implements ISerializer<BloomFilter>
      *
      * @return serialized size of the given bloom filter
      */
-    public long serializedSize(BloomFilter bf)
+    public long serializedSize(BloomFilter bf, DBTypeSizes typeSizes)
     {
-        return DBConstants.INT_SIZE // hash count
-               + DBConstants.INT_SIZE // length
-               + bf.bitset.getNumWords() * DBConstants.LONG_SIZE; // buckets
+        int bitLength = bf.bitset.getNumWords();
+        int pageSize = bf.bitset.getPageSize();
+        int pageCount = bf.bitset.getPageCount();
+
+        int size = 0;
+        size += typeSizes.sizeof(bf.getHashCount()); // hash count
+        size += typeSizes.sizeof(bitLength); // length
+
+        for (int p = 0; p < pageCount; p++)
+        {
+            long[] bits = bf.bitset.getPage(p);
+            for (int i = 0; i < pageSize && bitLength-- > 0; i++)
+                size += typeSizes.sizeof(bits[i]); // bucket
+        }
+        return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index b5bb859..027bc99 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -341,8 +341,7 @@ public class ByteBufferUtil
         assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : length;
         try
         {
-            out.writeByte((length >> 8) & 0xFF);
-            out.writeByte(length & 0xFF);
+            out.writeShort(length);
             write(buffer, out); // writing data bytes to output source
         }
         catch (IOException e)
@@ -365,8 +364,7 @@ public class ByteBufferUtil
     /* @return An unsigned short in an integer. */
     public static int readShortLength(DataInput in) throws IOException
     {
-        int length = (in.readByte() & 0xFF) << 8;
-        return length | (in.readByte() & 0xFF);
+        return in.readUnsignedShort();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
index 7a184d8..9cabb9f 100644
--- a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
+++ b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLongArray;
 
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.ISerializer;
 
 public class EstimatedHistogram
@@ -272,7 +273,7 @@ public class EstimatedHistogram
             return new EstimatedHistogram(offsets, buckets);
         }
 
-        public long serializedSize(EstimatedHistogram object)
+        public long serializedSize(EstimatedHistogram object, DBTypeSizes typeSizes)
         {
             throw new UnsupportedOperationException();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/FilterFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java
index 93c642f..ab453f8 100644
--- a/src/java/org/apache/cassandra/utils/FilterFactory.java
+++ b/src/java/org/apache/cassandra/utils/FilterFactory.java
@@ -18,16 +18,18 @@
 package org.apache.cassandra.utils;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.cassandra.db.DBTypeSizes;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class FilterFactory
 {
     private static final Logger logger = LoggerFactory.getLogger(FilterFactory.class);
+    private static final DBTypeSizes TYPE_SIZES = DBTypeSizes.NATIVE;
 
     public enum Type
     {
@@ -80,9 +82,9 @@ public class FilterFactory
             case SHA:
                 return LegacyBloomFilter.serializer.serializedSize((LegacyBloomFilter) bf);
             case MURMUR2:
-                return Murmur2BloomFilter.serializer.serializedSize((Murmur2BloomFilter) bf);
+                return Murmur2BloomFilter.serializer.serializedSize((Murmur2BloomFilter) bf, TYPE_SIZES);
             default:
-                return Murmur3BloomFilter.serializer.serializedSize((Murmur3BloomFilter) bf);
+                return Murmur3BloomFilter.serializer.serializedSize((Murmur3BloomFilter) bf, TYPE_SIZES);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index 98f1ce9..df675e6 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.utils;
 
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.ISerializer;
 
 import java.io.DataInput;
@@ -193,7 +193,7 @@ public class StreamingHistogram
             return new StreamingHistogram(maxBinSize, tmp);
         }
 
-        public long serializedSize(StreamingHistogram histogram)
+        public long serializedSize(StreamingHistogram histogram, DBTypeSizes typeSizes)
         {
             throw new UnsupportedOperationException();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
new file mode 100644
index 0000000..b35d180
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.vint;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.AbstractDataInput;
+
+/**
+ * Borrows idea from
+ * https://developers.google.com/protocol-buffers/docs/encoding#varints
+ * 
+ * Should be used with EncodedDataOutputStream
+ */
+public class EncodedDataInputStream extends AbstractDataInput
+{
+    private DataInput input;
+
+    public EncodedDataInputStream(DataInput input)
+    {
+        this.input = input;
+    }
+
+    public int skipBytes(int n) throws IOException
+    {
+        return input.skipBytes(n);
+    }
+
+    public int read() throws IOException
+    {
+        return input.readByte() & 0xFF;
+    }
+
+    protected void seekInternal(int position)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    protected int getPosition()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    /* as all of the integer types could be decoded using VInt we can use single method vintEncode */
+
+    public int readInt() throws IOException
+    {
+        return (int) vintDecode();
+    }
+
+    public long readLong() throws IOException
+    {
+        return vintDecode();
+    }
+
+    public int readUnsignedShort() throws IOException
+    {
+        return (short) vintDecode();
+    }
+    
+    public short readShort() throws IOException
+    {
+        return (short) vintDecode();
+    }
+
+    private long vintDecode() throws IOException
+    {
+        byte firstByte = input.readByte();
+        int len = vintDecodeSize(firstByte);
+        if (len == 1)
+            return firstByte;
+        long i = 0;
+        for (int idx = 0; idx < len - 1; idx++)
+        {
+            byte b = input.readByte();
+            i = i << 8;
+            i = i | (b & 0xFF);
+        }
+        return (vintIsNegative(firstByte) ? (i ^ -1L) : i);
+    }
+
+    private int vintDecodeSize(byte value)
+    {
+        if (value >= -112)
+        {
+            return 1;
+        }
+        else if (value < -120)
+        {
+            return -119 - value;
+        }
+        return -111 - value;
+    }
+
+    private boolean vintIsNegative(byte value)
+    {
+        return value < -120 || (value >= -112 && value < 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
new file mode 100644
index 0000000..92612b6
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.vint;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.cassandra.io.util.AbstractDataOutput;
+
+/**
+ * Borrows idea from
+ * https://developers.google.com/protocol-buffers/docs/encoding#varints
+ */
+public class EncodedDataOutputStream extends AbstractDataOutput
+{
+    private OutputStream out;
+
+    public EncodedDataOutputStream(OutputStream out)
+    {
+        this.out = out;
+    }
+
+    public void write(int b) throws IOException
+    {
+        out.write(b);
+    }
+
+    public void write(byte[] b) throws IOException
+    {
+        out.write(b);
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        out.write(b, off, len);
+    }
+
+    /* as all of the integer types could be encoded using VInt we can use single method vintEncode */
+
+    public void writeInt(int v) throws IOException
+    {
+        vintEncode(v);
+    }
+
+    public void writeLong(long v) throws IOException
+    {
+        vintEncode(v);
+    }
+
+    public void writeShort(int v) throws IOException
+    {
+        vintEncode(v);
+    }
+
+    private void vintEncode(long i) throws IOException
+    {
+        if (i >= -112 && i <= 127)
+        {
+            writeByte((byte) i);
+            return;
+        }
+        int len = -112;
+        if (i < 0)
+        {
+            i ^= -1L; // take one's complement'
+            len = -120;
+        }
+        long tmp = i;
+        while (tmp != 0)
+        {
+            tmp = tmp >> 8;
+            len--;
+        }
+        writeByte((byte) len);
+        len = (len < -120) ? -(len + 120) : -(len + 112);
+        for (int idx = len; idx != 0; idx--)
+        {
+            int shiftbits = (idx - 1) * 8;
+            long mask = 0xFFL << shiftbits;
+            writeByte((byte) ((i & mask) >> shiftbits));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 1f6a799..fd5259a 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -75,6 +75,11 @@ public class Util
         return new Column(ByteBufferUtil.bytes(name), ByteBufferUtil.bytes(value), timestamp);
     }
 
+    public static Column counterColumn(String name, long value, long timestamp)
+    {
+        return new CounterUpdateColumn(ByteBufferUtil.bytes(name), value, timestamp);
+    }
+
     public static SuperColumn superColumn(ColumnFamily cf, String name, Column... columns)
     {
         SuperColumn sc = new SuperColumn(ByteBufferUtil.bytes(name), cf.metadata().comparator);