You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/02 10:40:26 UTC

[2/5] cassandra git commit: Switch to DataInputPlus

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index eb884be..b925395 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.*;
 
@@ -25,6 +24,7 @@ import com.google.common.base.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
@@ -182,7 +182,7 @@ public class StreamingHistogram
             }
         }
 
-        public StreamingHistogram deserialize(DataInput in) throws IOException
+        public StreamingHistogram deserialize(DataInputPlus in) throws IOException
         {
             int maxBinSize = in.readInt();
             int size = in.readInt();
@@ -195,11 +195,11 @@ public class StreamingHistogram
             return new StreamingHistogram(maxBinSize, tmp);
         }
 
-        public long serializedSize(StreamingHistogram histogram, TypeSizes typeSizes)
+        public long serializedSize(StreamingHistogram histogram)
         {
-            long size = typeSizes.sizeof(histogram.maxBinSize);
+            long size = TypeSizes.sizeof(histogram.maxBinSize);
             Map<Double, Long> entries = histogram.getAsMap();
-            size += typeSizes.sizeof(entries.size());
+            size += TypeSizes.sizeof(entries.size());
             // size of entries = size * (8(double) + 8(long))
             size += entries.size() * (8L + 8L);
             return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/UUIDSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDSerializer.java b/src/java/org/apache/cassandra/utils/UUIDSerializer.java
index 2aa2b4e..2b174fe 100644
--- a/src/java/org/apache/cassandra/utils/UUIDSerializer.java
+++ b/src/java/org/apache/cassandra/utils/UUIDSerializer.java
@@ -17,12 +17,12 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class UUIDSerializer implements IVersionedSerializer<UUID>
@@ -35,13 +35,13 @@ public class UUIDSerializer implements IVersionedSerializer<UUID>
         out.writeLong(uuid.getLeastSignificantBits());
     }
 
-    public UUID deserialize(DataInput in, int version) throws IOException
+    public UUID deserialize(DataInputPlus in, int version) throws IOException
     {
         return new UUID(in.readLong(), in.readLong());
     }
 
     public long serializedSize(UUID uuid, int version)
     {
-        return TypeSizes.NATIVE.sizeof(uuid.getMostSignificantBits()) + TypeSizes.NATIVE.sizeof(uuid.getLeastSignificantBits());
+        return TypeSizes.sizeof(uuid.getMostSignificantBits()) + TypeSizes.sizeof(uuid.getLeastSignificantBits());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index ed7e54b..3b32fdb 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -21,8 +21,6 @@ import java.io.Closeable;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.cassandra.db.TypeSizes;
-
 public interface IBitSet extends Closeable
 {
     public long capacity();
@@ -46,7 +44,7 @@ public interface IBitSet extends Closeable
 
     public void serialize(DataOutput out) throws IOException;
 
-    public long serializedSize(TypeSizes type);
+    public long serializedSize();
 
     public void clear();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 46c1bd0..00c3e67 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -108,7 +108,7 @@ public class OffHeapBitSet implements IBitSet
         out.writeInt((int) (bytes.size() / 8));
         for (long i = 0; i < bytes.size();)
         {
-            long value = ((bytes.getByte(i++) & 0xff) << 0) 
+            long value = ((bytes.getByte(i++) & 0xff) << 0)
                        + ((bytes.getByte(i++) & 0xff) << 8)
                        + ((bytes.getByte(i++) & 0xff) << 16)
                        + ((long) (bytes.getByte(i++) & 0xff) << 24)
@@ -120,9 +120,9 @@ public class OffHeapBitSet implements IBitSet
         }
     }
 
-    public long serializedSize(TypeSizes type)
+    public long serializedSize()
     {
-        return type.sizeof((int) bytes.size()) + bytes.size();
+        return TypeSizes.sizeof((int) bytes.size()) + bytes.size();
     }
 
     @SuppressWarnings("resource")

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index e793f6c..dc48e5e 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -416,16 +416,16 @@ public class OpenBitSet implements IBitSet
     }
 }
 
-  public long serializedSize(TypeSizes type) {
+  public long serializedSize() {
     int bitLength = getNumWords();
     int pageSize = getPageSize();
     int pageCount = getPageCount();
 
-    long size = type.sizeof(bitLength); // length
+    long size = TypeSizes.sizeof(bitLength); // length
     for (int p = 0; p < pageCount; p++) {
       long[] bits = getPage(p);
       for (int i = 0; i < pageSize && bitLength-- > 0; i++)
-        size += type.sizeof(bits[i]); // bucket
+        size += TypeSizes.sizeof(bits[i]); // bucket
     }
     return size;
   }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
deleted file mode 100644
index 663e176..0000000
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils.vint;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-import org.apache.cassandra.io.util.AbstractDataInput;
-
-/**
- * Borrows idea from
- * https://developers.google.com/protocol-buffers/docs/encoding#varints
- *
- * Should be used with EncodedDataOutputStream
- *
- * @deprecated Where possible use NIODataInputStream which has a more efficient implementation of buffered input
- *             for most read methods
- */
-public class EncodedDataInputStream extends AbstractDataInput implements DataInput
-{
-    private DataInput input;
-
-    public EncodedDataInputStream(DataInput input)
-    {
-        this.input = input;
-    }
-
-    public int skipBytes(int n) throws IOException
-    {
-        return input.skipBytes(n);
-    }
-
-    public int read() throws IOException
-    {
-        return input.readByte() & 0xFF;
-    }
-
-    public void seek(long position)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public long getPosition()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public long getPositionLimit()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    protected long length()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    /* as all of the integer types could be decoded using VInt we can use single method vintEncode */
-
-    public int readInt() throws IOException
-    {
-        return (int) VIntCoding.readVInt(input);
-    }
-
-    public long readLong() throws IOException
-    {
-        return VIntCoding.readVInt(input);
-    }
-
-    public int readUnsignedShort() throws IOException
-    {
-        return (short) VIntCoding.readVInt(input);
-    }
-
-    public short readShort() throws IOException
-    {
-        return (short) VIntCoding.readVInt(input);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
deleted file mode 100644
index 7f7613f..0000000
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils.vint;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
-
-/**
- * Borrows idea from
- * https://developers.google.com/protocol-buffers/docs/encoding#varints
- */
-public class EncodedDataOutputStream extends UnbufferedDataOutputStreamPlus
-{
-    private OutputStream out;
-
-    public EncodedDataOutputStream(OutputStream out)
-    {
-        this.out = out;
-    }
-
-    public void write(int b) throws IOException
-    {
-        out.write(b);
-    }
-
-    public void write(byte[] b) throws IOException
-    {
-        out.write(b);
-    }
-
-    public void write(byte[] b, int off, int len) throws IOException
-    {
-        out.write(b, off, len);
-    }
-
-    /* as all of the integer types could be encoded using VInt we can use single method vintEncode */
-
-    public void writeInt(int v) throws IOException
-    {
-        writeVInt(v);
-    }
-
-    public void writeLong(long v) throws IOException
-    {
-        writeVInt(v);
-    }
-
-    public void writeShort(int v) throws IOException
-    {
-        writeVInt(v);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 9b4dde7..e1dd953 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -21,13 +21,10 @@ package org.apache.cassandra.db.commitlog;
  *
  */
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -55,14 +52,12 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.util.NIODataInputStream;
 
 public class CommitLogStressTest
 {
@@ -467,11 +462,11 @@ public class CommitLogStressTest
                 // Skip over this mutation.
                 return;
 
-            FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+            NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, size);
             Mutation mutation;
             try
             {
-                mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
+                mutation = Mutation.serializer.deserialize(bufIn,
                                                            desc.getMessagingVersion(),
                                                            SerializationHelper.Flag.LOCAL);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index ebfa79d..636d673 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -20,12 +20,14 @@
 package org.apache.cassandra;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessagingService;
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -59,11 +61,11 @@ public class AbstractSerializationsTester
         assert out.getLength() == serializer.serializedSize(obj, getVersion());
     }
 
-    protected static DataInputStream getInput(String name) throws IOException
+    protected static DataInputStreamPlus getInput(String name) throws IOException
     {
         File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
         assert f.exists() : f.getPath();
-        return new DataInputStream(new FileInputStream(f));
+        return new DataInputPlus.DataInputStreamPlus(new FileInputStream(f));
     }
 
     @SuppressWarnings("resource")

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index 9aadc2c..515902b 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.db.rows.RowStats;
@@ -33,6 +32,7 @@ import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.SchemaLoader;
@@ -40,7 +40,6 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.hadoop.io.DataInputBuffer;
 
 import static junit.framework.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
@@ -79,9 +78,7 @@ public class PartitionTest
         DataOutputBuffer bufOut = new DataOutputBuffer();
         CachedPartition.cacheSerializer.serialize(partition, bufOut);
 
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(bufOut.getData(), 0, bufOut.getLength());
-        CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(bufIn);
+        CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new NIODataInputStream(bufOut.getData()));
 
         assert deserialized != null;
         assert deserialized.metadata().cfName.equals(CF_STANDARD1);
@@ -106,9 +103,7 @@ public class PartitionTest
         DataOutputBuffer bufOut = new DataOutputBuffer();
         CachedPartition.cacheSerializer.serialize(partition, bufOut);
 
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(bufOut.getData(), 0, bufOut.getLength());
-        CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(bufIn);
+        CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new NIODataInputStream(bufOut.getData()));
 
         assertEquals(partition.columns().regulars.columnCount(), deserialized.columns().regulars.columnCount());
         assertTrue(deserialized.columns().regulars.getSimple(1).equals(partition.columns().regulars.getSimple(1)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/db/ReadMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
index bf6f23d..2475821 100644
--- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
@@ -23,9 +23,9 @@ import static org.junit.Assert.*;
 import java.io.*;
 
 import com.google.common.base.Predicate;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
@@ -38,7 +38,9 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -140,12 +142,11 @@ public class ReadMessageTest
     {
         IVersionedSerializer<ReadCommand> rms = ReadCommand.serializer;
         DataOutputBuffer out = new DataOutputBuffer();
-        ByteArrayInputStream bis;
 
         rms.serialize(rm, out, MessagingService.current_version);
 
-        bis = new ByteArrayInputStream(out.getData(), 0, out.getLength());
-        return rms.deserialize(new DataInputStream(bis), MessagingService.current_version);
+        DataInputPlus dis = new NIODataInputStream(out.getData());
+        return rms.deserialize(dis, MessagingService.current_version);
     }
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index fed569f..3f1918c 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -18,18 +18,16 @@
 */
 package org.apache.cassandra.db.commitlog;
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 
 import com.google.common.base.Predicate;
 
 import org.junit.Assert;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.NIODataInputStream;
 
 /**
  * Utility class for tests needing to examine the commitlog contents.
@@ -61,11 +59,11 @@ public class CommitLogTestReplayer extends CommitLogReplayer
     @Override
     void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
     {
-        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+        NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, size);
         Mutation mutation;
         try
         {
-            mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
+            mutation = Mutation.serializer.deserialize(bufIn,
                                                            desc.getMessagingVersion(),
                                                            SerializationHelper.Flag.LOCAL);
             Assert.assertTrue(processor.apply(mutation));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/GossipDigestTest.java b/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
index 2928b12..6a8a6d3 100644
--- a/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
@@ -20,11 +20,12 @@ package org.apache.cassandra.gms;
 
 import static org.junit.Assert.*;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
+
 import java.net.InetAddress;
 
 import org.apache.cassandra.net.MessagingService;
@@ -48,8 +49,8 @@ public class GossipDigestTest
         DataOutputBuffer output = new DataOutputBuffer();
         GossipDigest.serializer.serialize(expected, output, MessagingService.current_version);
 
-        ByteArrayInputStream input = new ByteArrayInputStream(output.getData(), 0, output.getLength());
-        GossipDigest actual = GossipDigest.serializer.deserialize(new DataInputStream(input), MessagingService.current_version);
+        DataInputPlus input = new NIODataInputStream(output.getData());
+        GossipDigest actual = GossipDigest.serializer.deserialize(input, MessagingService.current_version);
         assertEquals(0, expected.compareTo(actual));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index 080ae53..bab1ace 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -20,12 +20,12 @@ package org.apache.cassandra.gms;
 
 import org.apache.cassandra.AbstractSerializationsTester;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.junit.Test;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -58,7 +58,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         if (EXECUTE_WRITES)
             testEndpointStateWrite();
 
-        DataInputStream in = getInput("gms.EndpointState.bin");
+        DataInputStreamPlus in = getInput("gms.EndpointState.bin");
         assert HeartBeatState.serializer.deserialize(in, getVersion()) != null;
         assert EndpointState.serializer.deserialize(in, getVersion()) != null;
         assert VersionedValue.serializer.deserialize(in, getVersion()) != null;
@@ -98,7 +98,7 @@ public class SerializationsTest extends AbstractSerializationsTester
             testGossipDigestWrite();
 
         int count = 0;
-        DataInputStream in = getInput("gms.Gossip.bin");
+        DataInputStreamPlus in = getInput("gms.Gossip.bin");
         while (count < Statics.Digests.size())
             assert GossipDigestAck2.serializer.deserialize(in, getVersion()) != null;
         assert GossipDigestAck.serializer.deserialize(in, getVersion()) != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 9c8e0fb..b7af1be 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -25,12 +25,13 @@ import java.util.Collections;
 import java.util.UUID;
 
 import org.junit.Test;
-
 import org.apache.cassandra.AbstractSerializationsTester;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
@@ -79,7 +80,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         if (EXECUTE_WRITES)
             testValidationRequestWrite();
 
-        try (DataInputStream in = getInput("service.ValidationRequest.bin"))
+        try (DataInputStreamPlus in = getInput("service.ValidationRequest.bin"))
         {
             RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
             assert message.messageType == RepairMessage.Type.VALIDATION_REQUEST;
@@ -117,7 +118,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         if (EXECUTE_WRITES)
             testValidationCompleteWrite();
 
-        try (DataInputStream in = getInput("service.ValidationComplete.bin"))
+        try (DataInputStreamPlus in = getInput("service.ValidationComplete.bin"))
         {
             // empty validation
             RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
@@ -169,7 +170,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         InetAddress src = InetAddress.getByAddress(new byte[]{127, 0, 0, 2});
         InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3});
 
-        try (DataInputStream in = getInput("service.SyncRequest.bin"))
+        try (DataInputStreamPlus in = getInput("service.SyncRequest.bin"))
         {
             RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
             assert message.messageType == RepairMessage.Type.SYNC_REQUEST;
@@ -205,7 +206,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3});
         NodePair nodes = new NodePair(src, dest);
 
-        try (DataInputStream in = getInput("service.SyncComplete.bin"))
+        try (DataInputStreamPlus in = getInput("service.SyncComplete.bin"))
         {
             // success
             RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java b/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java
deleted file mode 100644
index a2cff63..0000000
--- a/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.io.*;
-
-import com.google.common.collect.Iterators;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.RowUpdateBuilder;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.vint.EncodedDataInputStream;
-import org.apache.cassandra.utils.vint.EncodedDataOutputStream;
-
-
-public class EncodedStreamsTest
-{
-    private static final String KEYSPACE1 = "Keyspace1";
-    private static final String CF_STANDARD = "Standard1";
-    private static final String CF_COUNTER = "Counter1";
-    private int version = MessagingService.current_version;
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-    SchemaLoader.prepareServer();
-    SchemaLoader.createKeyspace(KEYSPACE1,
-                                KeyspaceParams.simple(1),
-                                SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD),
-                                SchemaLoader.counterCFMD(KEYSPACE1, CF_COUNTER));
-    }
-
-    @Test
-    public void testStreams() throws IOException
-    {
-        ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream();
-        EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1);
-
-        ByteArrayOutputStream byteArrayOStream2 = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(byteArrayOStream2);
-        
-        for (short i = 0; i < 10000; i++)
-        {
-            out.writeShort(i);
-            odos.writeShort(i);
-        }
-        out.flush();
-        odos.flush();
-
-        for (int i = Short.MAX_VALUE; i < ((int)Short.MAX_VALUE + 10000); i++)
-        {
-            out.writeInt(i);
-            odos.writeInt(i);
-        }
-        out.flush();
-        odos.flush();
-
-        for (long i = Integer.MAX_VALUE; i < ((long)Integer.MAX_VALUE + 10000);i++)
-        {
-            out.writeLong(i);
-            odos.writeLong(i);
-        }
-        out.flush();
-        odos.flush();
-        Assert.assertTrue(byteArrayOStream1.size() < byteArrayOStream2.size());
-
-        ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray());
-        EncodedDataInputStream idis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1));
-
-        // assert reading Short
-        for (int i = 0; i < 10000; i++)
-            Assert.assertEquals(i, idis.readShort());
-
-        // assert reading Integer
-        for (int i = Short.MAX_VALUE; i < ((int)Short.MAX_VALUE + 10000); i++)
-            Assert.assertEquals(i, idis.readInt());
-
-        // assert reading Long
-        for (long i = Integer.MAX_VALUE; i < ((long)Integer.MAX_VALUE) + 1000; i++)
-            Assert.assertEquals(i, idis.readLong());
-    }
-
-    private UnfilteredRowIterator createTable()
-    {
-        CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD).metadata;
-
-        RowUpdateBuilder builder = new RowUpdateBuilder(cfm, 0, "key");
-
-        builder.clustering("vijay").add(cfm.partitionColumns().iterator().next(), "try").build();
-        builder.clustering("to").add(cfm.partitionColumns().iterator().next(), "be_nice").build();
-
-        return builder.unfilteredIterator();
-    }
-
-    private UnfilteredRowIterator createCounterTable()
-    {
-        CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER).metadata;
-        RowUpdateBuilder builder = new RowUpdateBuilder(cfm, 0, "key");
-
-        builder.clustering("vijay").add(cfm.partitionColumns().iterator().next(), 1L).build();
-        builder.clustering("wants").add(cfm.partitionColumns().iterator().next(), 1000000L).build();
-
-        return builder.unfilteredIterator();
-    }
-
-    @Test
-    public void testCFSerialization() throws IOException
-    {
-        ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream();
-        EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1);
-        UnfilteredRowIteratorSerializer.serializer.serialize(createTable(), odos, version, 1);
-
-        ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray());
-        EncodedDataInputStream odis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1));
-        UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(odis, version, SerializationHelper.Flag.LOCAL);
-        Assert.assertTrue(Iterators.elementsEqual(partition, createTable()));
-        Assert.assertEquals(byteArrayOStream1.size(), (int) UnfilteredRowIteratorSerializer.serializer.serializedSize(createTable(), version, 1, TypeSizes.VINT));
-    }
-
-    @Test
-    public void testCounterCFSerialization() throws IOException
-    {
-        ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream();
-        EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1);
-        UnfilteredRowIteratorSerializer.serializer.serialize(createCounterTable(), odos, version, 1);
-
-        ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray());
-        EncodedDataInputStream odis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1));
-        UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(odis, version, SerializationHelper.Flag.LOCAL);
-        Assert.assertTrue(Iterators.elementsEqual(partition, createCounterTable()));
-        Assert.assertEquals(byteArrayOStream1.size(), (int) UnfilteredRowIteratorSerializer.serializer.serializedSize(createCounterTable(), version, 1, TypeSizes.VINT));
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
index 01d7bd8..bc23f14 100644
--- a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
@@ -21,9 +21,6 @@ package org.apache.cassandra.utils;
  */
 
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
@@ -31,12 +28,12 @@ import java.util.Collections;
 import java.util.List;
 
 import org.junit.Test;
-
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.NIODataInputStream;
 
 import static org.junit.Assert.assertEquals;
 
@@ -155,12 +152,12 @@ public class IntervalTreeTest
                         out.writeInt(i);
                     }
 
-                    public Integer deserialize(DataInput in) throws IOException
+                    public Integer deserialize(DataInputPlus in) throws IOException
                     {
                         return in.readInt();
                     }
 
-                    public long serializedSize(Integer i, TypeSizes s)
+                    public long serializedSize(Integer i)
                     {
                         return 4;
                     }
@@ -172,12 +169,12 @@ public class IntervalTreeTest
                         out.writeUTF(v);
                     }
 
-                    public String deserialize(DataInput in) throws IOException
+                    public String deserialize(DataInputPlus in) throws IOException
                     {
                         return in.readUTF();
                     }
 
-                    public long serializedSize(String v, TypeSizes s)
+                    public long serializedSize(String v)
                     {
                         return v.length();
                     }
@@ -189,7 +186,7 @@ public class IntervalTreeTest
 
         serializer.serialize(it, out, 0);
 
-        DataInputStream in = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
+        DataInputPlus in = new NIODataInputStream(out.toByteArray());
 
         IntervalTree<Integer, String, Interval<Integer, String>> it2 = serializer.deserialize(in, 0);
         List<Interval<Integer, String>> intervals2 = new ArrayList<Interval<Integer, String>>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index fe7f506..03c906c 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@ -22,18 +22,18 @@ import java.math.BigInteger;
 import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteStreams;
+
 import org.junit.Before;
 import org.junit.Test;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.MerkleTree.Hashable;
 import org.apache.cassandra.utils.MerkleTree.RowHash;
@@ -400,7 +400,7 @@ public class MerkleTreeTest
         MerkleTree.serializer.serialize(mt, out, MessagingService.current_version);
         byte[] serialized = out.toByteArray();
 
-        ByteArrayDataInput in = ByteStreams.newDataInput(serialized);
+        DataInputPlus in = new NIODataInputStream(serialized);
         MerkleTree restored = MerkleTree.serializer.deserialize(in, MessagingService.current_version);
 
         assertHashEquals(initialhash, restored.hash(full));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
index 0775246..f3809b3 100644
--- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
@@ -23,9 +23,9 @@ import java.io.IOException;
 
 import org.junit.Assert;
 import org.junit.Test;
-
 import org.apache.cassandra.AbstractSerializationsTester;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.service.StorageService;
 
@@ -87,7 +87,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         if (EXECUTE_WRITES)
             testEstimatedHistogramWrite();
 
-        try (DataInputStream in = getInput("utils.EstimatedHistogram.bin"))
+        try (DataInputStreamPlus in = getInput("utils.EstimatedHistogram.bin"))
         {
             Assert.assertNotNull(EstimatedHistogram.serializer.deserialize(in));
             Assert.assertNotNull(EstimatedHistogram.serializer.deserialize(in));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
index 38b2f04..0ea25da 100644
--- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
+++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
@@ -24,8 +24,8 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.junit.Test;
-
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
 
 import static org.junit.Assert.assertEquals;
 
@@ -103,7 +103,7 @@ public class StreamingHistogramTest
         StreamingHistogram.serializer.serialize(hist, out);
         byte[] bytes = out.toByteArray();
 
-        StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
+        StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new NIODataInputStream(bytes));
 
         // deserialized histogram should have following values
         Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);