You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/02 10:40:29 UTC

[5/5] cassandra git commit: Switch to DataInputPlus

Switch to DataInputPlus

patch by ariel; reviewed by benedict for CASSANDRA-9499


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03f72acd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03f72acd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03f72acd

Branch: refs/heads/trunk
Commit: 03f72acd546407c7f9de2a976de31dcd565dba9a
Parents: 1491a40
Author: Ariel Weisberg <ar...@weisberg.ws>
Authored: Wed Jul 1 16:27:43 2015 -0400
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Jul 2 09:39:58 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java |   8 +-
 .../org/apache/cassandra/cache/OHCProvider.java | 123 ++------------
 .../cassandra/cache/SerializingCache.java       |  12 +-
 .../cache/SerializingCacheProvider.java         |  13 +-
 .../org/apache/cassandra/config/CFMetaData.java |   2 +-
 .../cassandra/db/AbstractLivenessInfo.java      |   6 +-
 .../apache/cassandra/db/BatchlogManager.java    |   7 +-
 .../org/apache/cassandra/db/Clustering.java     |   4 +-
 .../apache/cassandra/db/ClusteringPrefix.java   |  10 +-
 src/java/org/apache/cassandra/db/Columns.java   |   6 +-
 .../apache/cassandra/db/CounterMutation.java    |   6 +-
 src/java/org/apache/cassandra/db/DataRange.java |   2 +-
 .../org/apache/cassandra/db/DeletionInfo.java   |   2 +-
 .../org/apache/cassandra/db/DeletionTime.java   |   9 +-
 .../cassandra/db/HintedHandOffManager.java      |   7 +-
 .../org/apache/cassandra/db/LegacyLayout.java   |  13 +-
 src/java/org/apache/cassandra/db/Mutation.java  |  17 +-
 .../apache/cassandra/db/PartitionPosition.java  |   2 +-
 .../org/apache/cassandra/db/RangeTombstone.java |   6 +-
 .../apache/cassandra/db/RangeTombstoneList.java |  29 ++--
 .../org/apache/cassandra/db/ReadCommand.java    |  13 +-
 .../org/apache/cassandra/db/ReadResponse.java   |  12 +-
 .../org/apache/cassandra/db/RowIndexEntry.java  |  20 +--
 .../cassandra/db/SerializationHeader.java       |  36 ++--
 .../org/apache/cassandra/db/Serializers.java    |   9 +-
 .../db/SinglePartitionReadCommand.java          |   3 +-
 src/java/org/apache/cassandra/db/Slice.java     |  12 +-
 src/java/org/apache/cassandra/db/Slices.java    |   6 +-
 .../apache/cassandra/db/SnapshotCommand.java    |  12 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   6 +-
 .../apache/cassandra/db/TruncateResponse.java   |  10 +-
 .../org/apache/cassandra/db/Truncation.java     |   6 +-
 src/java/org/apache/cassandra/db/TypeSizes.java |  91 +++-------
 .../cassandra/db/UnfilteredDeserializer.java    |  13 +-
 .../org/apache/cassandra/db/WriteResponse.java  |   4 +-
 .../db/commitlog/CommitLogReplayer.java         |  10 +-
 .../cassandra/db/commitlog/ReplayPosition.java  |   8 +-
 .../cassandra/db/context/CounterContext.java    |   8 +-
 .../filter/AbstractClusteringIndexFilter.java   |   7 +-
 .../db/filter/ClusteringIndexNamesFilter.java   |   4 +-
 .../db/filter/ClusteringIndexSliceFilter.java   |   4 +-
 .../cassandra/db/filter/ColumnFilter.java       |  10 +-
 .../cassandra/db/filter/ColumnSubselection.java |  10 +-
 .../apache/cassandra/db/filter/DataLimits.java  |  17 +-
 .../apache/cassandra/db/filter/RowFilter.java   |  16 +-
 .../cassandra/db/marshal/AbstractType.java      |   4 +-
 .../cassandra/db/marshal/CollectionType.java    |   5 +-
 .../partitions/ArrayBackedCachedPartition.java  |  10 +-
 .../db/partitions/PartitionUpdate.java          |  17 +-
 .../org/apache/cassandra/db/rows/CellPath.java  |   3 +-
 .../org/apache/cassandra/db/rows/RowStats.java  |  10 +-
 .../rows/UnfilteredRowIteratorSerializer.java   |  24 +--
 .../cassandra/db/rows/UnfilteredSerializer.java |  64 +++----
 .../apache/cassandra/dht/AbstractBounds.java    |   2 +-
 .../org/apache/cassandra/dht/BootStrapper.java  |   8 +-
 src/java/org/apache/cassandra/dht/Token.java    |   2 +-
 .../org/apache/cassandra/gms/EchoMessage.java   |   4 +-
 .../org/apache/cassandra/gms/EndpointState.java |   9 +-
 .../org/apache/cassandra/gms/GossipDigest.java  |   7 +-
 .../apache/cassandra/gms/GossipDigestAck.java   |   6 +-
 .../apache/cassandra/gms/GossipDigestAck2.java  |   5 +-
 .../apache/cassandra/gms/GossipDigestSyn.java   |  11 +-
 .../apache/cassandra/gms/HeartBeatState.java    |   5 +-
 .../apache/cassandra/gms/VersionedValue.java    |   7 +-
 .../org/apache/cassandra/io/ISerializer.java    |   7 +-
 .../cassandra/io/IVersionedSerializer.java      |   4 +-
 .../io/compress/CompressionMetadata.java        |   7 +-
 .../io/compress/CompressionParameters.java      |  14 +-
 .../cassandra/io/sstable/IndexHelper.java       |  17 +-
 .../io/sstable/SSTableSimpleIterator.java       |   5 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |   4 +-
 .../io/sstable/metadata/CompactionMetadata.java |  10 +-
 .../metadata/IMetadataComponentSerializer.java  |   4 +-
 .../metadata/LegacyMetadataSerializer.java      |   4 +-
 .../io/sstable/metadata/StatsMetadata.java      |  17 +-
 .../io/sstable/metadata/ValidationMetadata.java |   6 +-
 .../cassandra/io/util/AbstractDataInput.java    |  36 +---
 .../apache/cassandra/io/util/DataInputPlus.java | 168 +++++++++++++++++++
 .../cassandra/io/util/DataOutputPlus.java       | 109 ++++++++++++
 .../apache/cassandra/io/util/FileDataInput.java |   3 +-
 .../cassandra/io/util/NIODataInputStream.java   |  74 +++++++-
 .../net/IncomingStreamingConnection.java        |   9 +-
 .../cassandra/net/IncomingTcpConnection.java    |  13 +-
 .../org/apache/cassandra/net/MessageIn.java     |   5 +-
 .../org/apache/cassandra/net/MessageOut.java    |  10 +-
 .../apache/cassandra/net/MessagingService.java  |   5 +-
 .../org/apache/cassandra/repair/NodePair.java   |   4 +-
 .../apache/cassandra/repair/RepairJobDesc.java  |  10 +-
 .../repair/messages/AnticompactionRequest.java  |   4 +-
 .../repair/messages/CleanupMessage.java         |   4 +-
 .../repair/messages/PrepareMessage.java         |  11 +-
 .../repair/messages/RepairMessage.java          |   4 +-
 .../repair/messages/SnapshotMessage.java        |   4 +-
 .../cassandra/repair/messages/SyncComplete.java |   6 +-
 .../cassandra/repair/messages/SyncRequest.java  |   6 +-
 .../repair/messages/ValidationComplete.java     |   6 +-
 .../repair/messages/ValidationRequest.java      |   6 +-
 .../apache/cassandra/service/CacheService.java  |  10 +-
 .../cassandra/service/MigrationManager.java     |  11 +-
 .../apache/cassandra/service/paxos/Commit.java  |  18 +-
 .../service/paxos/PrepareResponse.java          |  17 +-
 .../cassandra/streaming/StreamReader.java       |   8 +-
 .../cassandra/streaming/StreamRequest.java      |  14 +-
 .../cassandra/streaming/StreamSummary.java      |   8 +-
 .../streaming/compress/CompressionInfo.java     |   8 +-
 .../streaming/messages/FileMessageHeader.java   |  20 +--
 .../streaming/messages/IncomingFileMessage.java |   4 +-
 .../streaming/messages/PrepareMessage.java      |   4 +-
 .../streaming/messages/ReceivedMessage.java     |   4 +-
 .../streaming/messages/RetryMessage.java        |   4 +-
 .../streaming/messages/StreamInitMessage.java   |  14 +-
 .../org/apache/cassandra/utils/BloomFilter.java |   3 +-
 .../cassandra/utils/BloomFilterSerializer.java  |  12 +-
 .../cassandra/utils/BooleanSerializer.java      |   4 +-
 .../apache/cassandra/utils/ByteBufferUtil.java  |   8 +-
 .../cassandra/utils/BytesReadTracker.java       |   4 +-
 .../cassandra/utils/EstimatedHistogram.java     |  13 +-
 .../apache/cassandra/utils/IntervalTree.java    |  23 +--
 .../org/apache/cassandra/utils/MerkleTree.java  |  17 +-
 .../cassandra/utils/StreamingHistogram.java     |  10 +-
 .../apache/cassandra/utils/UUIDSerializer.java  |   6 +-
 .../org/apache/cassandra/utils/obs/IBitSet.java |   4 +-
 .../cassandra/utils/obs/OffHeapBitSet.java      |   6 +-
 .../apache/cassandra/utils/obs/OpenBitSet.java  |   6 +-
 .../utils/vint/EncodedDataInputStream.java      |  94 -----------
 .../utils/vint/EncodedDataOutputStream.java     |  69 --------
 .../db/commitlog/CommitLogStressTest.java       |  11 +-
 .../cassandra/AbstractSerializationsTester.java |   8 +-
 .../org/apache/cassandra/db/PartitionTest.java  |  11 +-
 .../apache/cassandra/db/ReadMessageTest.java    |   9 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   8 +-
 .../apache/cassandra/gms/GossipDigestTest.java  |   9 +-
 .../cassandra/gms/SerializationsTest.java       |   6 +-
 .../cassandra/service/SerializationsTest.java   |  11 +-
 .../cassandra/utils/EncodedStreamsTest.java     | 158 -----------------
 .../cassandra/utils/IntervalTreeTest.java       |  17 +-
 .../apache/cassandra/utils/MerkleTreeTest.java  |   8 +-
 .../cassandra/utils/SerializationsTest.java     |   4 +-
 .../cassandra/utils/StreamingHistogramTest.java |   4 +-
 139 files changed, 971 insertions(+), 1146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 9dda019..0b334f5 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -39,6 +38,7 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
@@ -132,11 +132,11 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
         File path = getCachePath(cfs.metadata.cfId, CURRENT_VERSION);
         if (path.exists())
         {
-            DataInputStream in = null;
+            DataInputStreamPlus in = null;
             try
             {
                 logger.info(String.format("reading saved cache %s", path));
-                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
+                in = new DataInputStreamPlus(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
                 List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
                 while (in.available() > 0)
                 {
@@ -356,6 +356,6 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
     {
         void serialize(K key, DataOutputPlus out) throws IOException;
 
-        Future<Pair<K, V>> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException;
+        Future<Pair<K, V>> deserialize(DataInputPlus in, ColumnFamilyStore cfs) throws IOException;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java
index 981a1f8..46cbb8b 100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@ -20,19 +20,15 @@ package org.apache.cassandra.cache;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
 import java.util.Iterator;
 import java.util.UUID;
 
-import com.google.common.base.Function;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.partitions.CachedPartition;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputPlusAdapter;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.net.MessagingService;
 import org.caffinitas.ohc.OHCache;
 import org.caffinitas.ohc.OHCacheBuilder;
 
@@ -42,8 +38,8 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
     {
         OHCacheBuilder<RowCacheKey, IRowCacheEntry> builder = OHCacheBuilder.newBuilder();
         builder.capacity(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024)
-               .keySerializer(new KeySerializer())
-               .valueSerializer(new ValueSerializer())
+               .keySerializer(KeySerializer.instance)
+               .valueSerializer(ValueSerializer.instance)
                .throwOOME(true);
 
         return new OHCacheAdapter(builder.build());
@@ -70,7 +66,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
 
         public void put(RowCacheKey key, IRowCacheEntry value)
         {
-            ohCache.put(key, value);
+            ohCache.put(key,  value);
         }
 
         public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value)
@@ -126,6 +122,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
 
     private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey>
     {
+        private static KeySerializer instance = new KeySerializer();
         public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException
         {
             dataOutput.writeLong(rowCacheKey.cfId.getMostSignificantBits());
@@ -151,6 +148,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
 
     private static class ValueSerializer implements org.caffinitas.ohc.CacheSerializer<IRowCacheEntry>
     {
+        private static ValueSerializer instance = new ValueSerializer();
         public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException
         {
             assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache
@@ -159,7 +157,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
             if (isSentinel)
                 out.writeLong(((RowCacheSentinel) entry).sentinelId);
             else
-                CachedPartition.cacheSerializer.serialize((CachedPartition)entry, new DataOutputPlusAdapter(out));
+                CachedPartition.cacheSerializer.serialize((CachedPartition)entry, new DataOutputPlus.DataOutputPlusAdapter(out));
         }
 
         public IRowCacheEntry deserialize(DataInput in) throws IOException
@@ -167,116 +165,17 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
             boolean isSentinel = in.readBoolean();
             if (isSentinel)
                 return new RowCacheSentinel(in.readLong());
-            return CachedPartition.cacheSerializer.deserialize(in);
+            return CachedPartition.cacheSerializer.deserialize(new DataInputPlusAdapter(in));
         }
 
         public int serializedSize(IRowCacheEntry entry)
         {
-            TypeSizes typeSizes = TypeSizes.NATIVE;
-            int size = typeSizes.sizeof(true);
+            int size = TypeSizes.sizeof(true);
             if (entry instanceof RowCacheSentinel)
-                size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId);
+                size += TypeSizes.sizeof(((RowCacheSentinel) entry).sentinelId);
             else
-                size += CachedPartition.cacheSerializer.serializedSize((CachedPartition) entry, typeSizes);
+                size += CachedPartition.cacheSerializer.serializedSize((CachedPartition) entry);
             return size;
         }
     }
-
-    static class DataOutputPlusAdapter implements DataOutputPlus
-    {
-        private final DataOutput out;
-
-        public void write(byte[] b) throws IOException
-        {
-            out.write(b);
-        }
-
-        public void write(byte[] b, int off, int len) throws IOException
-        {
-            out.write(b, off, len);
-        }
-
-        public void write(int b) throws IOException
-        {
-            out.write(b);
-        }
-
-        public void writeBoolean(boolean v) throws IOException
-        {
-            out.writeBoolean(v);
-        }
-
-        public void writeByte(int v) throws IOException
-        {
-            out.writeByte(v);
-        }
-
-        public void writeBytes(String s) throws IOException
-        {
-            out.writeBytes(s);
-        }
-
-        public void writeChar(int v) throws IOException
-        {
-            out.writeChar(v);
-        }
-
-        public void writeChars(String s) throws IOException
-        {
-            out.writeChars(s);
-        }
-
-        public void writeDouble(double v) throws IOException
-        {
-            out.writeDouble(v);
-        }
-
-        public void writeFloat(float v) throws IOException
-        {
-            out.writeFloat(v);
-        }
-
-        public void writeInt(int v) throws IOException
-        {
-            out.writeInt(v);
-        }
-
-        public void writeLong(long v) throws IOException
-        {
-            out.writeLong(v);
-        }
-
-        public void writeShort(int v) throws IOException
-        {
-            out.writeShort(v);
-        }
-
-        public void writeUTF(String s) throws IOException
-        {
-            out.writeUTF(s);
-        }
-
-        public DataOutputPlusAdapter(DataOutput out)
-        {
-            this.out = out;
-        }
-
-        public void write(ByteBuffer buffer) throws IOException
-        {
-            if (buffer.hasArray())
-                out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
-            else
-                throw new UnsupportedOperationException("IMPLEMENT ME");
-        }
-
-        public void write(Memory memory, long offset, long length) throws IOException
-        {
-            throw new UnsupportedOperationException("IMPLEMENT ME");
-        }
-
-        public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException
-        {
-            throw new UnsupportedOperationException("IMPLEMENT ME");
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index 0e38922..ae1c428 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -26,12 +26,11 @@ import org.slf4j.LoggerFactory;
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import com.googlecode.concurrentlinkedhashmap.EvictionListener;
 import com.googlecode.concurrentlinkedhashmap.Weigher;
-import org.apache.cassandra.db.TypeSizes;
+
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.MemoryInputStream;
 import org.apache.cassandra.io.util.MemoryOutputStream;
-import org.apache.cassandra.utils.vint.EncodedDataInputStream;
-import org.apache.cassandra.utils.vint.EncodedDataOutputStream;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
 
 /**
  * Serializes cache values off-heap.
@@ -39,7 +38,6 @@ import org.apache.cassandra.utils.vint.EncodedDataOutputStream;
 public class SerializingCache<K, V> implements ICache<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(SerializingCache.class);
-    private static final TypeSizes ENCODED_TYPE_SIZES = TypeSizes.VINT;
 
     private static final int DEFAULT_CONCURENCY_LEVEL = 64;
 
@@ -88,7 +86,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
     {
         try
         {
-            return serializer.deserialize(new EncodedDataInputStream(new MemoryInputStream(mem)));
+            return serializer.deserialize(new MemoryInputStream(mem));
         }
         catch (IOException e)
         {
@@ -99,7 +97,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
 
     private RefCountedMemory serialize(V value)
     {
-        long serializedSize = serializer.serializedSize(value, ENCODED_TYPE_SIZES);
+        long serializedSize = serializer.serializedSize(value);
         if (serializedSize > Integer.MAX_VALUE)
             throw new IllegalArgumentException("Unable to allocate " + serializedSize + " bytes");
 
@@ -115,7 +113,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
 
         try
         {
-            serializer.serialize(value, new EncodedDataOutputStream(new MemoryOutputStream(freeableMemory)));
+            serializer.serialize(value, new WrappedDataOutputStreamPlus(new MemoryOutputStream(freeableMemory)));
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
index 70d9e73..1119295 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
@@ -17,15 +17,14 @@
  */
 package org.apache.cassandra.cache;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
 
 public class SerializingCacheProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
 {
@@ -48,7 +47,7 @@ public class SerializingCacheProvider implements CacheProvider<RowCacheKey, IRow
                 CachedPartition.cacheSerializer.serialize((CachedPartition)entry, out);
         }
 
-        public IRowCacheEntry deserialize(DataInput in) throws IOException
+        public IRowCacheEntry deserialize(DataInputPlus in) throws IOException
         {
             boolean isSentinel = in.readBoolean();
             if (isSentinel)
@@ -57,13 +56,13 @@ public class SerializingCacheProvider implements CacheProvider<RowCacheKey, IRow
             return CachedPartition.cacheSerializer.deserialize(in);
         }
 
-        public long serializedSize(IRowCacheEntry entry, TypeSizes typeSizes)
+        public long serializedSize(IRowCacheEntry entry)
         {
-            int size = typeSizes.sizeof(true);
+            int size = TypeSizes.sizeof(true);
             if (entry instanceof RowCacheSentinel)
-                size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId);
+                size += TypeSizes.sizeof(((RowCacheSentinel) entry).sentinelId);
             else
-                size += CachedPartition.cacheSerializer.serializedSize((CachedPartition) entry, typeSizes);
+                size += CachedPartition.cacheSerializer.serializedSize((CachedPartition) entry);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 4db53e7..6deee6d 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1579,7 +1579,7 @@ public final class CFMetaData
             return metadata;
         }
 
-        public long serializedSize(CFMetaData metadata, int version, TypeSizes sizes)
+        public long serializedSize(CFMetaData metadata, int version)
         {
             // We've made sure it was encoded as 16 bytes whatever the TypeSizes is.
             return 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java b/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java
index bbda598..4f4b5d1 100644
--- a/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java
@@ -85,11 +85,11 @@ public abstract class AbstractLivenessInfo implements LivenessInfo
     {
         int size = 0;
         if (hasTimestamp())
-            size += TypeSizes.NATIVE.sizeof(timestamp());
+            size += TypeSizes.sizeof(timestamp());
         if (hasTTL())
-            size += TypeSizes.NATIVE.sizeof(ttl());
+            size += TypeSizes.sizeof(ttl());
         if (hasLocalDeletionTime())
-            size += TypeSizes.NATIVE.sizeof(localDeletionTime());
+            size += TypeSizes.sizeof(localDeletionTime());
         return size;
 
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 83a0654..39a7aa6 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -34,7 +35,6 @@ import com.google.common.util.concurrent.RateLimiter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -47,7 +47,9 @@ import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -57,6 +59,7 @@ import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
+
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 
 public class BatchlogManager implements BatchlogManagerMBean
@@ -315,7 +318,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         private List<Mutation> replayingMutations() throws IOException
         {
-            DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
+            DataInputPlus in = new NIODataInputStream(data, true);
             int size = in.readInt();
             List<Mutation> mutations = new ArrayList<>(size);
             for (int i = 0; i < size; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index 5ac9671..541556b 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -151,9 +151,9 @@ public abstract class Clustering extends AbstractClusteringPrefix
             ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, version, types);
         }
 
-        public long serializedSize(Clustering clustering, int version, List<AbstractType<?>> types, TypeSizes sizes)
+        public long serializedSize(Clustering clustering, int version, List<AbstractType<?>> types)
         {
-            return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types, sizes);
+            return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types);
         }
 
         public void deserialize(DataInput in, int version, List<AbstractType<?>> types, Writer writer) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 73cedb8..36d91e7 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -273,14 +273,14 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab
                 return Slice.Bound.serializer.deserializeValues(in, kind, version, types);
         }
 
-        public long serializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types, TypeSizes sizes)
+        public long serializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types)
         {
             // We shouldn't serialize static clusterings
             assert clustering.kind() != Kind.STATIC_CLUSTERING;
             if (clustering.kind() == Kind.CLUSTERING)
-                return 1 + Clustering.serializer.serializedSize((Clustering)clustering, version, types, sizes);
+                return 1 + Clustering.serializer.serializedSize((Clustering)clustering, version, types);
             else
-                return Slice.Bound.serializer.serializedSize((Slice.Bound)clustering, version, types, sizes);
+                return Slice.Bound.serializer.serializedSize((Slice.Bound)clustering, version, types);
         }
 
         void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
@@ -299,7 +299,7 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab
             }
         }
 
-        long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types, TypeSizes sizes)
+        long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types)
         {
             if (clustering.size() == 0)
                 return 0;
@@ -311,7 +311,7 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab
                 if (v == null || !v.hasRemaining())
                     continue; // handled in the header
 
-                size += types.get(i).writtenLength(v, sizes);
+                size += types.get(i).writtenLength(v);
             }
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index 83d39db..94e45dc 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -502,11 +502,11 @@ public class Columns implements Iterable<ColumnDefinition>
                 ByteBufferUtil.writeWithShortLength(column.name.bytes, out);
         }
 
-        public long serializedSize(Columns columns, TypeSizes sizes)
+        public long serializedSize(Columns columns)
         {
-            long size = sizes.sizeof((short)columns.columnCount());
+            long size = TypeSizes.sizeof((short)columns.columnCount());
             for (ColumnDefinition column : columns)
-                size += sizes.sizeofWithShortLength(column.name.bytes);
+                size += TypeSizes.sizeofWithShortLength(column.name.bytes);
             return size;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index f87c66c..978417f 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +36,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -322,7 +322,7 @@ public class CounterMutation implements IMutation
             out.writeUTF(cm.consistency.name());
         }
 
-        public CounterMutation deserialize(DataInput in, int version) throws IOException
+        public CounterMutation deserialize(DataInputPlus in, int version) throws IOException
         {
             Mutation m = Mutation.serializer.deserialize(in, version);
             ConsistencyLevel consistency = Enum.valueOf(ConsistencyLevel.class, in.readUTF());
@@ -332,7 +332,7 @@ public class CounterMutation implements IMutation
         public long serializedSize(CounterMutation cm, int version)
         {
             return Mutation.serializer.serializedSize(cm.mutation, version)
-                 + TypeSizes.NATIVE.sizeof(cm.consistency.name());
+                 + TypeSizes.sizeof(cm.consistency.name());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 909d6ed..0d7a762 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -397,7 +397,7 @@ public class DataRange
 
             if (range instanceof Paging)
             {
-                size += Clustering.serializer.serializedSize(((Paging)range).lastReturned, version, metadata.comparator.subtypes(), TypeSizes.NATIVE);
+                size += Clustering.serializer.serializedSize(((Paging)range).lastReturned, version, metadata.comparator.subtypes());
                 size += 1; // inclusive boolean
             }
             return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index e54d6b1..a441c48 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -192,7 +192,7 @@ public class DeletionInfo implements IMeasurableMemory
 
     public int dataSize()
     {
-        int size = TypeSizes.NATIVE.sizeof(partitionDeletion.markedForDeleteAt());
+        int size = TypeSizes.sizeof(partitionDeletion.markedForDeleteAt());
         return size + (ranges == null ? 0 : ranges.dataSize());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index f070778..67842a7 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -25,6 +25,7 @@ import com.google.common.base.Objects;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.FBUtilities;
@@ -144,7 +145,7 @@ public abstract class DeletionTime implements Comparable<DeletionTime>, IMeasura
             out.writeLong(delTime.markedForDeleteAt());
         }
 
-        public DeletionTime deserialize(DataInput in) throws IOException
+        public DeletionTime deserialize(DataInputPlus in) throws IOException
         {
             int ldt = in.readInt();
             long mfda = in.readLong();
@@ -158,10 +159,10 @@ public abstract class DeletionTime implements Comparable<DeletionTime>, IMeasura
             FileUtils.skipBytesFully(in, 4 + 8);
         }
 
-        public long serializedSize(DeletionTime delTime, TypeSizes typeSizes)
+        public long serializedSize(DeletionTime delTime)
         {
-            return typeSizes.sizeof(delTime.localDeletionTime())
-                 + typeSizes.sizeof(delTime.markedForDeleteAt());
+            return TypeSizes.sizeof(delTime.localDeletionTime())
+                 + TypeSizes.sizeof(delTime.markedForDeleteAt());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 848ba01..38113c8 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -33,9 +34,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.Uninterruptibles;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -53,6 +54,8 @@ import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -418,7 +421,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 Cell cell = hint.getCell(hintColumn);
 
                 final long timestamp = cell.livenessInfo().timestamp();
-                DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(cell.value()));
+                DataInputPlus in = new NIODataInputStream(cell.value(), true);
                 Mutation mutation;
                 try
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index c1a7fd0..9eb7145 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -26,15 +26,16 @@ import java.util.*;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.utils.*;
@@ -330,7 +331,7 @@ public abstract class LegacyLayout
         };
     }
 
-    public static Row extractStaticColumns(CFMetaData metadata, DataInput in, Columns statics) throws IOException
+    public static Row extractStaticColumns(CFMetaData metadata, DataInputPlus in, Columns statics) throws IOException
     {
         assert !statics.isEmpty();
         assert metadata.isCompactTable();
@@ -609,7 +610,7 @@ public abstract class LegacyLayout
         };
     }
 
-    public static LegacyAtom readLegacyAtom(CFMetaData metadata, DataInput in, boolean readAllAsDynamic) throws IOException
+    public static LegacyAtom readLegacyAtom(CFMetaData metadata, DataInputPlus in, boolean readAllAsDynamic) throws IOException
     {
         while (true)
         {
@@ -677,14 +678,14 @@ public abstract class LegacyLayout
         }
     }
 
-    public static LegacyRangeTombstone readLegacyRangeTombstone(CFMetaData metadata, DataInput in) throws IOException
+    public static LegacyRangeTombstone readLegacyRangeTombstone(CFMetaData metadata, DataInputPlus in) throws IOException
     {
         ByteBuffer boundname = ByteBufferUtil.readWithShortLength(in);
         in.readUnsignedByte();
         return readLegacyRangeTombstoneBody(metadata, in, boundname);
     }
 
-    public static LegacyRangeTombstone readLegacyRangeTombstoneBody(CFMetaData metadata, DataInput in, ByteBuffer boundname) throws IOException
+    public static LegacyRangeTombstone readLegacyRangeTombstoneBody(CFMetaData metadata, DataInputPlus in, ByteBuffer boundname) throws IOException
     {
         LegacyBound min = decodeBound(metadata, boundname, true);
         LegacyBound max = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), false);
@@ -1214,7 +1215,7 @@ public abstract class LegacyLayout
                 //rtlSerializer.serialize(info.ranges, out, version);
             }
 
-            public LegacyDeletionInfo deserialize(CFMetaData metadata, DataInput in, int version) throws IOException
+            public LegacyDeletionInfo deserialize(CFMetaData metadata, DataInputPlus in, int version) throws IOException
             {
                 DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index b2eaf8e..aca6622 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -17,24 +17,22 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.*;
 
 import org.apache.commons.lang3.StringUtils;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -261,7 +259,7 @@ public class Mutation implements IMutation
                 PartitionUpdate.serializer.serialize(entry.getValue(), out, version);
         }
 
-        public Mutation deserialize(DataInput in, int version, SerializationHelper.Flag flag) throws IOException
+        public Mutation deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
         {
             String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
             if (version < MessagingService.VERSION_20)
@@ -293,28 +291,27 @@ public class Mutation implements IMutation
             return new Mutation(keyspaceName, key, modifications);
         }
 
-        public Mutation deserialize(DataInput in, int version) throws IOException
+        public Mutation deserialize(DataInputPlus in, int version) throws IOException
         {
             return deserialize(in, version, SerializationHelper.Flag.FROM_REMOTE);
         }
 
         public long serializedSize(Mutation mutation, int version)
         {
-            TypeSizes sizes = TypeSizes.NATIVE;
             int size = 0;
 
             if (version < MessagingService.VERSION_20)
-                size += sizes.sizeof(mutation.getKeyspaceName());
+                size += TypeSizes.sizeof(mutation.getKeyspaceName());
 
             if (version < MessagingService.VERSION_30)
             {
                 int keySize = mutation.key().getKey().remaining();
-                size += sizes.sizeof((short) keySize) + keySize;
+                size += TypeSizes.sizeof((short) keySize) + keySize;
             }
 
-            size += sizes.sizeof(mutation.modifications.size());
+            size += TypeSizes.sizeof(mutation.modifications.size());
             for (Map.Entry<UUID, PartitionUpdate> entry : mutation.modifications.entrySet())
-                size += PartitionUpdate.serializer.serializedSize(entry.getValue(), version, sizes);
+                size += PartitionUpdate.serializer.serializedSize(entry.getValue(), version);
 
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/PartitionPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionPosition.java b/src/java/org/apache/cassandra/db/PartitionPosition.java
index 1dc940e..afb446d 100644
--- a/src/java/org/apache/cassandra/db/PartitionPosition.java
+++ b/src/java/org/apache/cassandra/db/PartitionPosition.java
@@ -100,7 +100,7 @@ public interface PartitionPosition extends RingPosition<PartitionPosition>
             if (kind == Kind.ROW_KEY)
             {
                 int keySize = ((DecoratedKey)pos).getKey().remaining();
-                size += TypeSizes.NATIVE.sizeof((short) keySize) + keySize;
+                size += TypeSizes.sizeof((short) keySize) + keySize;
             }
             else
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 3373afa..de21950 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -153,11 +153,11 @@ public class RangeTombstone
                 ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types);
             }
 
-            public long serializedSize(RangeTombstone.Bound bound, int version, List<AbstractType<?>> types, TypeSizes sizes)
+            public long serializedSize(RangeTombstone.Bound bound, int version, List<AbstractType<?>> types)
             {
                 return 1 // kind ordinal
-                     + sizes.sizeof((short)bound.size())
-                     + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types, sizes);
+                     + TypeSizes.sizeof((short)bound.size())
+                     + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types);
             }
 
             public Kind deserialize(DataInput in, int version, List<AbstractType<?>> types, Writer writer) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index 0c27bc4..c377d10 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -25,12 +24,13 @@ import java.util.Iterator;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
@@ -289,12 +289,12 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
 
     public int dataSize()
     {
-        int dataSize = TypeSizes.NATIVE.sizeof(size);
+        int dataSize = TypeSizes.sizeof(size);
         for (int i = 0; i < size; i++)
         {
             dataSize += starts[i].dataSize() + ends[i].dataSize();
-            dataSize += TypeSizes.NATIVE.sizeof(markedAts[i]);
-            dataSize += TypeSizes.NATIVE.sizeof(delTimes[i]);
+            dataSize += TypeSizes.sizeof(markedAts[i]);
+            dataSize += TypeSizes.sizeof(delTimes[i]);
         }
         return dataSize;
     }
@@ -463,7 +463,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         RangeTombstoneList that = (RangeTombstoneList)o;
         if (size != that.size)
             return false;
-        
+
         for (int i = 0; i < size; i++)
         {
             if (!starts[i].equals(that.starts[i]))
@@ -796,7 +796,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
             //}
         }
 
-        public RangeTombstoneList deserialize(DataInput in, int version) throws IOException
+        public RangeTombstoneList deserialize(DataInputPlus in, int version) throws IOException
         {
             // TODO
             throw new UnsupportedOperationException();
@@ -834,27 +834,22 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
             //return tombstones;
         }
 
-        public long serializedSize(RangeTombstoneList tombstones, TypeSizes typeSizes, int version)
+        public long serializedSize(RangeTombstoneList tombstones, int version)
         {
             // TODO
             throw new UnsupportedOperationException();
             //if (tombstones == null)
-            //    return typeSizes.sizeof(0);
+            //    return TypeSizes.sizeof(0);
 
-            //long size = typeSizes.sizeof(tombstones.size);
+            //long size = TypeSizes.sizeof(tombstones.size);
             //for (int i = 0; i < tombstones.size; i++)
             //{
             //    size += type.serializer().serializedSize(tombstones.starts[i], typeSizes);
             //    size += type.serializer().serializedSize(tombstones.ends[i], typeSizes);
-            //    size += typeSizes.sizeof(tombstones.delTimes[i]);
-            //    size += typeSizes.sizeof(tombstones.markedAts[i]);
+            //    size += TypeSizes.sizeof(tombstones.delTimes[i]);
+            //    size += TypeSizes.sizeof(tombstones.markedAts[i]);
             //}
             //return size;
         }
-
-        public long serializedSize(RangeTombstoneList tombstones, int version)
-        {
-            return serializedSize(tombstones, TypeSizes.NATIVE, version);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index bad096f..7cc4884 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -21,9 +21,9 @@ import java.io.DataInput;
 import java.io.IOException;
 
 import com.google.common.collect.Iterables;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.ColumnFamilyMetrics;
 import org.apache.cassandra.net.MessageOut;
@@ -481,7 +482,7 @@ public abstract class ReadCommand implements ReadQuery
             command.serializeSelection(out, version);
         }
 
-        public ReadCommand deserialize(DataInput in, int version) throws IOException
+        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
         {
             if (version < MessagingService.VERSION_30)
                 throw new UnsupportedOperationException();
@@ -504,12 +505,10 @@ public abstract class ReadCommand implements ReadQuery
             if (version < MessagingService.VERSION_30)
                 throw new UnsupportedOperationException();
 
-            TypeSizes sizes = TypeSizes.NATIVE;
-
             return 2 // kind + flags
-                 + CFMetaData.serializer.serializedSize(command.metadata(), version, sizes)
-                 + sizes.sizeof(command.nowInSec())
-                 + ColumnFilter.serializer.serializedSize(command.columnFilter(), version, sizes)
+                 + CFMetaData.serializer.serializedSize(command.metadata(), version)
+                 + TypeSizes.sizeof(command.nowInSec())
+                 + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
                  + RowFilter.serializer.serializedSize(command.rowFilter(), version)
                  + DataLimits.serializer.serializedSize(command.limits(), version)
                  + command.selectionSerializedSize(version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 6453077..6a61416 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
@@ -161,7 +162,7 @@ public abstract class ReadResponse
             }
         }
 
-        public ReadResponse deserialize(DataInput in, int version) throws IOException
+        public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
         {
             if (version < MessagingService.VERSION_30)
             {
@@ -186,9 +187,8 @@ public abstract class ReadResponse
                 throw new UnsupportedOperationException();
             }
 
-            TypeSizes sizes = TypeSizes.NATIVE;
             boolean isDigest = response.isDigestQuery();
-            long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER, sizes);
+            long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER);
 
             if (!isDigest)
             {
@@ -196,7 +196,7 @@ public abstract class ReadResponse
                 // version, we'll have to deserialize/re-serialize the data to be in the proper version.
                 assert version == MessagingService.VERSION_30;
                 ByteBuffer data = ((DataResponse)response).data;
-                size += ByteBufferUtil.serializedSizeWithLength(data, sizes);
+                size += ByteBufferUtil.serializedSizeWithLength(data);
             }
             return size;
         }
@@ -213,7 +213,7 @@ public abstract class ReadResponse
             //            Row.serializer.serialize(row, out, version);
         }
 
-        public ReadResponse deserialize(DataInput in, int version) throws IOException
+        public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
         {
             // TODO
             throw new UnsupportedOperationException();
@@ -228,7 +228,7 @@ public abstract class ReadResponse
         {
             // TODO
             throw new UnsupportedOperationException();
-            //        int size = TypeSizes.NATIVE.sizeof(rsr.rows.size());
+            //        int size = TypeSizes.sizeof(rsr.rows.size());
             //        for (Row row : rsr.rows)
             //            size += Row.serializer.serializedSize(row, version);
             //        return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 016e26e..e783508 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -101,7 +102,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
     public static interface IndexSerializer<T>
     {
         void serialize(RowIndexEntry<T> rie, DataOutputPlus out) throws IOException;
-        RowIndexEntry<T> deserialize(DataInput in) throws IOException;
+        RowIndexEntry<T> deserialize(DataInputPlus in) throws IOException;
         public int serializedSize(RowIndexEntry<T> rie);
     }
 
@@ -133,7 +134,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
             }
         }
 
-        public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInput in) throws IOException
+        public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInputPlus in) throws IOException
         {
             long position = in.readLong();
 
@@ -173,18 +174,18 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
         public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie)
         {
-            int size = TypeSizes.NATIVE.sizeof(rie.position) + TypeSizes.NATIVE.sizeof(rie.promotedSize(metadata, version, header));
+            int size = TypeSizes.sizeof(rie.position) + TypeSizes.sizeof(rie.promotedSize(metadata, version, header));
 
             if (rie.isIndexed())
             {
                 List<IndexHelper.IndexInfo> index = rie.columnsIndex();
 
-                size += DeletionTime.serializer.serializedSize(rie.deletionTime(), TypeSizes.NATIVE);
-                size += TypeSizes.NATIVE.sizeof(index.size());
+                size += DeletionTime.serializer.serializedSize(rie.deletionTime());
+                size += TypeSizes.sizeof(index.size());
 
                 IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version);
                 for (IndexHelper.IndexInfo info : index)
-                    size += idxSerializer.serializedSize(info, header, TypeSizes.NATIVE);
+                    size += idxSerializer.serializedSize(info, header);
             }
 
 
@@ -227,12 +228,11 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         @Override
         public int promotedSize(CFMetaData metadata, Version version, SerializationHeader header)
         {
-            TypeSizes typeSizes = TypeSizes.NATIVE;
-            long size = DeletionTime.serializer.serializedSize(deletionTime, typeSizes);
-            size += typeSizes.sizeof(columnsIndex.size()); // number of entries
+            long size = DeletionTime.serializer.serializedSize(deletionTime);
+            size += TypeSizes.sizeof(columnsIndex.size()); // number of entries
             IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version);
             for (IndexHelper.IndexInfo info : columnsIndex)
-                size += idxSerializer.serializedSize(info, header, typeSizes);
+                size += idxSerializer.serializedSize(info, header);
 
             return Ints.checkedCast(size);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 304332e..c720804 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -377,13 +378,13 @@ public class SerializationHeader
             return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null);
         }
 
-        public long serializedSizeForMessaging(SerializationHeader header, TypeSizes sizes, boolean hasStatic)
+        public long serializedSizeForMessaging(SerializationHeader header, boolean hasStatic)
         {
-            long size = RowStats.serializer.serializedSize(header.stats, sizes);
+            long size = RowStats.serializer.serializedSize(header.stats);
 
             if (hasStatic)
-                size += Columns.serializer.serializedSize(header.columns.statics, sizes);
-            size += Columns.serializer.serializedSize(header.columns.regulars, sizes);
+                size += Columns.serializer.serializedSize(header.columns.statics);
+            size += Columns.serializer.serializedSize(header.columns.regulars);
             return size;
         }
 
@@ -402,7 +403,7 @@ public class SerializationHeader
         }
 
         // For SSTables
-        public Component deserialize(Version version, DataInput in) throws IOException
+        public Component deserialize(Version version, DataInputPlus in) throws IOException
         {
             RowStats stats = RowStats.serializer.deserialize(in);
 
@@ -424,16 +425,15 @@ public class SerializationHeader
         // For SSTables
         public int serializedSize(Component header)
         {
-            TypeSizes sizes = TypeSizes.NATIVE;
-            int size = RowStats.serializer.serializedSize(header.stats, sizes);
+            int size = RowStats.serializer.serializedSize(header.stats);
 
-            size += sizeofType(header.keyType, sizes);
-            size += sizes.sizeof((short)header.clusteringTypes.size());
+            size += sizeofType(header.keyType);
+            size += TypeSizes.sizeof((short)header.clusteringTypes.size());
             for (AbstractType<?> type : header.clusteringTypes)
-                size += sizeofType(type, sizes);
+                size += sizeofType(type);
 
-            size += sizeofColumnsWithTypes(header.staticColumns, sizes);
-            size += sizeofColumnsWithTypes(header.regularColumns, sizes);
+            size += sizeofColumnsWithTypes(header.staticColumns);
+            size += sizeofColumnsWithTypes(header.regularColumns);
             return size;
         }
 
@@ -447,13 +447,13 @@ public class SerializationHeader
             }
         }
 
-        private long sizeofColumnsWithTypes(Map<ByteBuffer, AbstractType<?>> columns, TypeSizes sizes)
+        private long sizeofColumnsWithTypes(Map<ByteBuffer, AbstractType<?>> columns)
         {
-            long size = sizes.sizeof((short)columns.size());
+            long size = TypeSizes.sizeof((short)columns.size());
             for (Map.Entry<ByteBuffer, AbstractType<?>> entry : columns.entrySet())
             {
-                size += sizes.sizeofWithShortLength(entry.getKey());
-                size += sizeofType(entry.getValue(), sizes);
+                size += TypeSizes.sizeofWithShortLength(entry.getKey());
+                size += sizeofType(entry.getValue());
             }
             return size;
         }
@@ -480,9 +480,9 @@ public class SerializationHeader
             return TypeParser.parse(UTF8Type.instance.compose(raw));
         }
 
-        private int sizeofType(AbstractType<?> type, TypeSizes sizes)
+        private int sizeofType(AbstractType<?> type)
         {
-            return sizes.sizeofWithLength(UTF8Type.instance.decompose(type.toString()));
+            return TypeSizes.sizeofWithLength(UTF8Type.instance.decompose(type.toString()));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java
index 862d02e..8056222 100644
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -21,6 +21,7 @@ import java.io.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
 
@@ -43,7 +44,7 @@ public class Serializers
         return new IndexInfo.Serializer(metadata, version);
     }
 
-    // Note that for the old layout, this will actually discard the cellname parts that are not strictly 
+    // Note that for the old layout, this will actually discard the cellname parts that are not strictly
     // part of the clustering prefix. Don't use this if that's not what you want.
     public ISerializer<ClusteringPrefix> clusteringPrefixSerializer(final Version version, final SerializationHeader header)
     {
@@ -57,14 +58,14 @@ public class Serializers
                 ClusteringPrefix.serializer.serialize(clustering, out, version.correspondingMessagingVersion(), header.clusteringTypes());
             }
 
-            public ClusteringPrefix deserialize(DataInput in) throws IOException
+            public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
             {
                 return ClusteringPrefix.serializer.deserialize(in, version.correspondingMessagingVersion(), header.clusteringTypes());
             }
 
-            public long serializedSize(ClusteringPrefix clustering, TypeSizes sizes)
+            public long serializedSize(ClusteringPrefix clustering)
             {
-                return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), header.clusteringTypes(), sizes);
+                return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), header.clusteringTypes());
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 38651c1..3fa8486 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -401,8 +401,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
 
     protected long selectionSerializedSize(int version)
     {
-        TypeSizes sizes = TypeSizes.NATIVE;
-        return metadata().getKeyValidator().writtenLength(partitionKey().getKey(), sizes)
+        return metadata().getKeyValidator().writtenLength(partitionKey().getKey())
              + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java
index dae491e..05c2977 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -325,10 +325,10 @@ public class Slice
             Bound.serializer.serialize(slice.end, out, version, types);
         }
 
-        public long serializedSize(Slice slice, int version, List<AbstractType<?>> types, TypeSizes sizes)
+        public long serializedSize(Slice slice, int version, List<AbstractType<?>> types)
         {
-            return Bound.serializer.serializedSize(slice.start, version, types, sizes)
-                 + Bound.serializer.serializedSize(slice.end, version, types, sizes);
+            return Bound.serializer.serializedSize(slice.start, version, types)
+                 + Bound.serializer.serializedSize(slice.end, version, types);
         }
 
         public Slice deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
@@ -615,11 +615,11 @@ public class Slice
                 ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types);
             }
 
-            public long serializedSize(Slice.Bound bound, int version, List<AbstractType<?>> types, TypeSizes sizes)
+            public long serializedSize(Slice.Bound bound, int version, List<AbstractType<?>> types)
             {
                 return 1 // kind ordinal
-                     + sizes.sizeof((short)bound.size())
-                     + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types, sizes);
+                     + TypeSizes.sizeof((short)bound.size())
+                     + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types);
             }
 
             public Slice.Bound deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Slices.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java
index ec7797d..a6c690b 100644
--- a/src/java/org/apache/cassandra/db/Slices.java
+++ b/src/java/org/apache/cassandra/db/Slices.java
@@ -301,9 +301,9 @@ public abstract class Slices implements Iterable<Slice>
                 Slice.serializer.serialize(slice, out, version, types);
         }
 
-        public long serializedSize(Slices slices, int version, TypeSizes sizes)
+        public long serializedSize(Slices slices, int version)
         {
-            long size = sizes.sizeof(slices.size());
+            long size = TypeSizes.sizeof(slices.size());
 
             if (slices.size() == 0)
                 return size;
@@ -313,7 +313,7 @@ public abstract class Slices implements Iterable<Slice>
                                         : ((ArrayBackedSlices)slices).comparator.subtypes();
 
             for (Slice slice : slices)
-                size += Slice.serializer.serializedSize(slice, version, types, sizes);
+                size += Slice.serializer.serializedSize(slice, version, types);
 
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/SnapshotCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SnapshotCommand.java b/src/java/org/apache/cassandra/db/SnapshotCommand.java
index 427e9ec..eb6f67a 100644
--- a/src/java/org/apache/cassandra/db/SnapshotCommand.java
+++ b/src/java/org/apache/cassandra/db/SnapshotCommand.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -67,7 +67,7 @@ class SnapshotCommandSerializer implements IVersionedSerializer<SnapshotCommand>
         out.writeBoolean(snapshot_command.clear_snapshot);
     }
 
-    public SnapshotCommand deserialize(DataInput in, int version) throws IOException
+    public SnapshotCommand deserialize(DataInputPlus in, int version) throws IOException
     {
         String keyspace = in.readUTF();
         String column_family = in.readUTF();
@@ -78,9 +78,9 @@ class SnapshotCommandSerializer implements IVersionedSerializer<SnapshotCommand>
 
     public long serializedSize(SnapshotCommand sc, int version)
     {
-        return TypeSizes.NATIVE.sizeof(sc.keyspace)
-             + TypeSizes.NATIVE.sizeof(sc.column_family)
-             + TypeSizes.NATIVE.sizeof(sc.snapshot_name)
-             + TypeSizes.NATIVE.sizeof(sc.clear_snapshot);
+        return TypeSizes.sizeof(sc.keyspace)
+             + TypeSizes.sizeof(sc.column_family)
+             + TypeSizes.sizeof(sc.snapshot_name)
+             + TypeSizes.sizeof(sc.clear_snapshot);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index ffbc1eb..f38436b 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -22,15 +22,16 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
 import com.google.common.base.Function;
 import com.google.common.collect.*;
 import com.google.common.io.ByteStreams;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -47,6 +48,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.net.MessagingService;
@@ -519,7 +521,7 @@ public final class SystemKeyspace
     {
         try
         {
-            DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
+            NIODataInputStream in = new NIODataInputStream(bytes, true);
             return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/TruncateResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TruncateResponse.java b/src/java/org/apache/cassandra/db/TruncateResponse.java
index d8f5ad2..af4ed8f 100644
--- a/src/java/org/apache/cassandra/db/TruncateResponse.java
+++ b/src/java/org/apache/cassandra/db/TruncateResponse.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -58,7 +58,7 @@ public class TruncateResponse
             out.writeBoolean(tr.success);
         }
 
-        public TruncateResponse deserialize(DataInput in, int version) throws IOException
+        public TruncateResponse deserialize(DataInputPlus in, int version) throws IOException
         {
             String keyspace = in.readUTF();
             String columnFamily = in.readUTF();
@@ -68,9 +68,9 @@ public class TruncateResponse
 
         public long serializedSize(TruncateResponse tr, int version)
         {
-            return TypeSizes.NATIVE.sizeof(tr.keyspace)
-                 + TypeSizes.NATIVE.sizeof(tr.columnFamily)
-                 + TypeSizes.NATIVE.sizeof(tr.success);
+            return TypeSizes.sizeof(tr.keyspace)
+                 + TypeSizes.sizeof(tr.columnFamily)
+                 + TypeSizes.sizeof(tr.success);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Truncation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Truncation.java b/src/java/org/apache/cassandra/db/Truncation.java
index 88742cd..39a2ec6 100644
--- a/src/java/org/apache/cassandra/db/Truncation.java
+++ b/src/java/org/apache/cassandra/db/Truncation.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -60,7 +60,7 @@ class TruncationSerializer implements IVersionedSerializer<Truncation>
         out.writeUTF(t.columnFamily);
     }
 
-    public Truncation deserialize(DataInput in, int version) throws IOException
+    public Truncation deserialize(DataInputPlus in, int version) throws IOException
     {
         String keyspace = in.readUTF();
         String columnFamily = in.readUTF();
@@ -69,6 +69,6 @@ class TruncationSerializer implements IVersionedSerializer<Truncation>
 
     public long serializedSize(Truncation truncation, int version)
     {
-        return TypeSizes.NATIVE.sizeof(truncation.keyspace) + TypeSizes.NATIVE.sizeof(truncation.columnFamily);
+        return TypeSizes.sizeof(truncation.keyspace) + TypeSizes.sizeof(truncation.columnFamily);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/TypeSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java
index 79d5774..a9e432f 100644
--- a/src/java/org/apache/cassandra/db/TypeSizes.java
+++ b/src/java/org/apache/cassandra/db/TypeSizes.java
@@ -22,10 +22,10 @@ import java.util.UUID;
 
 import org.apache.cassandra.utils.vint.VIntCoding;
 
-public abstract class TypeSizes
+public final class TypeSizes
 {
-    public static final TypeSizes NATIVE = new NativeDBTypeSizes();
-    public static final TypeSizes VINT = new VIntEncodedTypeSizes();
+
+    private TypeSizes(){}
 
     private static final int BOOL_SIZE = 1;
     private static final int SHORT_SIZE = 2;
@@ -33,14 +33,8 @@ public abstract class TypeSizes
     private static final int LONG_SIZE = 8;
     private static final int UUID_SIZE = 16;
 
-    public abstract int sizeof(boolean value);
-    public abstract int sizeof(short value);
-    public abstract int sizeof(int value);
-    public abstract int sizeof(long value);
-    public abstract int sizeof(UUID value);
-
     /** assumes UTF8 */
-    public int sizeof(String value)
+    public static int sizeof(String value)
     {
         int length = encodedUTF8Length(value);
         assert length <= Short.MAX_VALUE;
@@ -64,76 +58,43 @@ public abstract class TypeSizes
         return utflen;
     }
 
-    public int sizeofWithShortLength(ByteBuffer value)
+    public static int sizeofWithShortLength(ByteBuffer value)
     {
         return sizeof((short) value.remaining()) + value.remaining();
     }
 
-    public int sizeofWithLength(ByteBuffer value)
+    public static int sizeofWithLength(ByteBuffer value)
     {
         return sizeof(value.remaining()) + value.remaining();
     }
 
-    public static class NativeDBTypeSizes extends TypeSizes
+    public static int sizeof(boolean value)
     {
-        public int sizeof(boolean value)
-        {
-            return BOOL_SIZE;
-        }
-
-        public int sizeof(short value)
-        {
-            return SHORT_SIZE;
-        }
-
-        public int sizeof(int value)
-        {
-            return INT_SIZE;
-        }
-
-        public int sizeof(long value)
-        {
-            return LONG_SIZE;
-        }
-
-        public int sizeof(UUID value)
-        {
-            return UUID_SIZE;
-        }
+        return BOOL_SIZE;
     }
 
-    public static class VIntEncodedTypeSizes extends TypeSizes
+    public static int sizeof(short value)
     {
-        private static final int BOOL_SIZE = 1;
-
-        public int sizeofVInt(long i)
-        {
-            return VIntCoding.computeVIntSize(i);
-        }
-
-        public int sizeof(long i)
-        {
-            return sizeofVInt(i);
-        }
+        return SHORT_SIZE;
+    }
 
-        public int sizeof(boolean i)
-        {
-            return BOOL_SIZE;
-        }
+    public static int sizeof(int value)
+    {
+        return INT_SIZE;
+    }
 
-        public int sizeof(short i)
-        {
-            return sizeofVInt(i);
-        }
+    public static int sizeof(long value)
+    {
+        return LONG_SIZE;
+    }
 
-        public int sizeof(int i)
-        {
-            return sizeofVInt(i);
-        }
+    public static int sizeof(UUID value)
+    {
+        return UUID_SIZE;
+    }
 
-        public int sizeof(UUID value)
-        {
-            return sizeofVInt(value.getMostSignificantBits()) + sizeofVInt(value.getLeastSignificantBits());
-        }
+    public static int sizeofVInt(long value)
+    {
+        return VIntCoding.computeVIntSize(value);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index a15fb61..cf7c2dd 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -17,14 +17,13 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.net.MessagingService;
 
 /**
@@ -39,11 +38,11 @@ public abstract class UnfilteredDeserializer
     private static final Logger logger = LoggerFactory.getLogger(UnfilteredDeserializer.class);
 
     protected final CFMetaData metadata;
-    protected final DataInput in;
+    protected final DataInputPlus in;
     protected final SerializationHelper helper;
 
     protected UnfilteredDeserializer(CFMetaData metadata,
-                                     DataInput in,
+                                     DataInputPlus in,
                                      SerializationHelper helper)
     {
         this.metadata = metadata;
@@ -52,7 +51,7 @@ public abstract class UnfilteredDeserializer
     }
 
     public static UnfilteredDeserializer create(CFMetaData metadata,
-                                                DataInput in,
+                                                DataInputPlus in,
                                                 SerializationHeader header,
                                                 SerializationHelper helper,
                                                 DeletionTime partitionDeletion,
@@ -116,7 +115,7 @@ public abstract class UnfilteredDeserializer
         private final RangeTombstoneMarker.Builder markerBuilder;
 
         private CurrentDeserializer(CFMetaData metadata,
-                                    DataInput in,
+                                    DataInputPlus in,
                                     SerializationHeader header,
                                     SerializationHelper helper)
         {
@@ -237,7 +236,7 @@ public abstract class UnfilteredDeserializer
         private RangeTombstoneMarker closingMarker;
 
         private OldFormatDeserializer(CFMetaData metadata,
-                                      DataInput in,
+                                      DataInputPlus in,
                                       SerializationHelper helper,
                                       DeletionTime partitionDeletion,
                                       boolean readAllAsDynamic)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/WriteResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java
index a7b108b..824368e 100644
--- a/src/java/org/apache/cassandra/db/WriteResponse.java
+++ b/src/java/org/apache/cassandra/db/WriteResponse.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -43,7 +43,7 @@ public class WriteResponse
         {
         }
 
-        public WriteResponse deserialize(DataInput in, int version) throws IOException
+        public WriteResponse deserialize(DataInputPlus in, int version) throws IOException
         {
             return new WriteResponse();
         }