You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/02 10:40:26 UTC
[2/5] cassandra git commit: Switch to DataInputPlus
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index eb884be..b925395 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.utils;
-import java.io.DataInput;
import java.io.IOException;
import java.util.*;
@@ -25,6 +24,7 @@ import com.google.common.base.Objects;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
/**
@@ -182,7 +182,7 @@ public class StreamingHistogram
}
}
- public StreamingHistogram deserialize(DataInput in) throws IOException
+ public StreamingHistogram deserialize(DataInputPlus in) throws IOException
{
int maxBinSize = in.readInt();
int size = in.readInt();
@@ -195,11 +195,11 @@ public class StreamingHistogram
return new StreamingHistogram(maxBinSize, tmp);
}
- public long serializedSize(StreamingHistogram histogram, TypeSizes typeSizes)
+ public long serializedSize(StreamingHistogram histogram)
{
- long size = typeSizes.sizeof(histogram.maxBinSize);
+ long size = TypeSizes.sizeof(histogram.maxBinSize);
Map<Double, Long> entries = histogram.getAsMap();
- size += typeSizes.sizeof(entries.size());
+ size += TypeSizes.sizeof(entries.size());
// size of entries = size * (8(double) + 8(long))
size += entries.size() * (8L + 8L);
return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/UUIDSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDSerializer.java b/src/java/org/apache/cassandra/utils/UUIDSerializer.java
index 2aa2b4e..2b174fe 100644
--- a/src/java/org/apache/cassandra/utils/UUIDSerializer.java
+++ b/src/java/org/apache/cassandra/utils/UUIDSerializer.java
@@ -17,12 +17,12 @@
*/
package org.apache.cassandra.utils;
-import java.io.DataInput;
import java.io.IOException;
import java.util.UUID;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
public class UUIDSerializer implements IVersionedSerializer<UUID>
@@ -35,13 +35,13 @@ public class UUIDSerializer implements IVersionedSerializer<UUID>
out.writeLong(uuid.getLeastSignificantBits());
}
- public UUID deserialize(DataInput in, int version) throws IOException
+ public UUID deserialize(DataInputPlus in, int version) throws IOException
{
return new UUID(in.readLong(), in.readLong());
}
public long serializedSize(UUID uuid, int version)
{
- return TypeSizes.NATIVE.sizeof(uuid.getMostSignificantBits()) + TypeSizes.NATIVE.sizeof(uuid.getLeastSignificantBits());
+ return TypeSizes.sizeof(uuid.getMostSignificantBits()) + TypeSizes.sizeof(uuid.getLeastSignificantBits());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index ed7e54b..3b32fdb 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -21,8 +21,6 @@ import java.io.Closeable;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.cassandra.db.TypeSizes;
-
public interface IBitSet extends Closeable
{
public long capacity();
@@ -46,7 +44,7 @@ public interface IBitSet extends Closeable
public void serialize(DataOutput out) throws IOException;
- public long serializedSize(TypeSizes type);
+ public long serializedSize();
public void clear();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 46c1bd0..00c3e67 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -108,7 +108,7 @@ public class OffHeapBitSet implements IBitSet
out.writeInt((int) (bytes.size() / 8));
for (long i = 0; i < bytes.size();)
{
- long value = ((bytes.getByte(i++) & 0xff) << 0)
+ long value = ((bytes.getByte(i++) & 0xff) << 0)
+ ((bytes.getByte(i++) & 0xff) << 8)
+ ((bytes.getByte(i++) & 0xff) << 16)
+ ((long) (bytes.getByte(i++) & 0xff) << 24)
@@ -120,9 +120,9 @@ public class OffHeapBitSet implements IBitSet
}
}
- public long serializedSize(TypeSizes type)
+ public long serializedSize()
{
- return type.sizeof((int) bytes.size()) + bytes.size();
+ return TypeSizes.sizeof((int) bytes.size()) + bytes.size();
}
@SuppressWarnings("resource")
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index e793f6c..dc48e5e 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -416,16 +416,16 @@ public class OpenBitSet implements IBitSet
}
}
- public long serializedSize(TypeSizes type) {
+ public long serializedSize() {
int bitLength = getNumWords();
int pageSize = getPageSize();
int pageCount = getPageCount();
- long size = type.sizeof(bitLength); // length
+ long size = TypeSizes.sizeof(bitLength); // length
for (int p = 0; p < pageCount; p++) {
long[] bits = getPage(p);
for (int i = 0; i < pageSize && bitLength-- > 0; i++)
- size += type.sizeof(bits[i]); // bucket
+ size += TypeSizes.sizeof(bits[i]); // bucket
}
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
deleted file mode 100644
index 663e176..0000000
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils.vint;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-import org.apache.cassandra.io.util.AbstractDataInput;
-
-/**
- * Borrows idea from
- * https://developers.google.com/protocol-buffers/docs/encoding#varints
- *
- * Should be used with EncodedDataOutputStream
- *
- * @deprecated Where possible use NIODataInputStream which has a more efficient implementation of buffered input
- * for most read methods
- */
-public class EncodedDataInputStream extends AbstractDataInput implements DataInput
-{
- private DataInput input;
-
- public EncodedDataInputStream(DataInput input)
- {
- this.input = input;
- }
-
- public int skipBytes(int n) throws IOException
- {
- return input.skipBytes(n);
- }
-
- public int read() throws IOException
- {
- return input.readByte() & 0xFF;
- }
-
- public void seek(long position)
- {
- throw new UnsupportedOperationException();
- }
-
- public long getPosition()
- {
- throw new UnsupportedOperationException();
- }
-
- public long getPositionLimit()
- {
- throw new UnsupportedOperationException();
- }
-
- protected long length()
- {
- throw new UnsupportedOperationException();
- }
-
- /* as all of the integer types could be decoded using VInt we can use single method vintEncode */
-
- public int readInt() throws IOException
- {
- return (int) VIntCoding.readVInt(input);
- }
-
- public long readLong() throws IOException
- {
- return VIntCoding.readVInt(input);
- }
-
- public int readUnsignedShort() throws IOException
- {
- return (short) VIntCoding.readVInt(input);
- }
-
- public short readShort() throws IOException
- {
- return (short) VIntCoding.readVInt(input);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
deleted file mode 100644
index 7f7613f..0000000
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils.vint;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
-
-/**
- * Borrows idea from
- * https://developers.google.com/protocol-buffers/docs/encoding#varints
- */
-public class EncodedDataOutputStream extends UnbufferedDataOutputStreamPlus
-{
- private OutputStream out;
-
- public EncodedDataOutputStream(OutputStream out)
- {
- this.out = out;
- }
-
- public void write(int b) throws IOException
- {
- out.write(b);
- }
-
- public void write(byte[] b) throws IOException
- {
- out.write(b);
- }
-
- public void write(byte[] b, int off, int len) throws IOException
- {
- out.write(b, off, len);
- }
-
- /* as all of the integer types could be encoded using VInt we can use single method vintEncode */
-
- public void writeInt(int v) throws IOException
- {
- writeVInt(v);
- }
-
- public void writeLong(long v) throws IOException
- {
- writeVInt(v);
- }
-
- public void writeShort(int v) throws IOException
- {
- writeVInt(v);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 9b4dde7..e1dd953 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -21,13 +21,10 @@ package org.apache.cassandra.db.commitlog;
*
*/
-import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -55,14 +52,12 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.util.NIODataInputStream;
public class CommitLogStressTest
{
@@ -467,11 +462,11 @@ public class CommitLogStressTest
// Skip over this mutation.
return;
- FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+ NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, size);
Mutation mutation;
try
{
- mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
+ mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(),
SerializationHelper.Flag.LOCAL);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index ebfa79d..636d673 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -20,12 +20,14 @@
package org.apache.cassandra;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.NIODataInputStream;
import org.apache.cassandra.net.MessagingService;
-import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -59,11 +61,11 @@ public class AbstractSerializationsTester
assert out.getLength() == serializer.serializedSize(obj, getVersion());
}
- protected static DataInputStream getInput(String name) throws IOException
+ protected static DataInputStreamPlus getInput(String name) throws IOException
{
File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
assert f.exists() : f.getPath();
- return new DataInputStream(new FileInputStream(f));
+ return new DataInputPlus.DataInputStreamPlus(new FileInputStream(f));
}
@SuppressWarnings("resource")
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index 9aadc2c..515902b 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
import org.junit.BeforeClass;
import org.junit.Test;
-
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.db.rows.RowStats;
@@ -33,6 +32,7 @@ import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.SchemaLoader;
@@ -40,7 +40,6 @@ import org.apache.cassandra.Util;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.hadoop.io.DataInputBuffer;
import static junit.framework.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
@@ -79,9 +78,7 @@ public class PartitionTest
DataOutputBuffer bufOut = new DataOutputBuffer();
CachedPartition.cacheSerializer.serialize(partition, bufOut);
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bufOut.getData(), 0, bufOut.getLength());
- CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(bufIn);
+ CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new NIODataInputStream(bufOut.getData()));
assert deserialized != null;
assert deserialized.metadata().cfName.equals(CF_STANDARD1);
@@ -106,9 +103,7 @@ public class PartitionTest
DataOutputBuffer bufOut = new DataOutputBuffer();
CachedPartition.cacheSerializer.serialize(partition, bufOut);
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bufOut.getData(), 0, bufOut.getLength());
- CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(bufIn);
+ CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new NIODataInputStream(bufOut.getData()));
assertEquals(partition.columns().regulars.columnCount(), deserialized.columns().regulars.columnCount());
assertTrue(deserialized.columns().regulars.getSimple(1).equals(partition.columns().regulars.getSimple(1)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/db/ReadMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
index bf6f23d..2475821 100644
--- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
@@ -23,9 +23,9 @@ import static org.junit.Assert.*;
import java.io.*;
import com.google.common.base.Predicate;
+
import org.junit.BeforeClass;
import org.junit.Test;
-
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
@@ -38,7 +38,9 @@ import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -140,12 +142,11 @@ public class ReadMessageTest
{
IVersionedSerializer<ReadCommand> rms = ReadCommand.serializer;
DataOutputBuffer out = new DataOutputBuffer();
- ByteArrayInputStream bis;
rms.serialize(rm, out, MessagingService.current_version);
- bis = new ByteArrayInputStream(out.getData(), 0, out.getLength());
- return rms.deserialize(new DataInputStream(bis), MessagingService.current_version);
+ DataInputPlus dis = new NIODataInputStream(out.getData());
+ return rms.deserialize(dis, MessagingService.current_version);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index fed569f..3f1918c 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -18,18 +18,16 @@
*/
package org.apache.cassandra.db.commitlog;
-import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import com.google.common.base.Predicate;
import org.junit.Assert;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.NIODataInputStream;
/**
* Utility class for tests needing to examine the commitlog contents.
@@ -61,11 +59,11 @@ public class CommitLogTestReplayer extends CommitLogReplayer
@Override
void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
{
- FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+ NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, size);
Mutation mutation;
try
{
- mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
+ mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(),
SerializationHelper.Flag.LOCAL);
Assert.assertTrue(processor.apply(mutation));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/GossipDigestTest.java b/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
index 2928b12..6a8a6d3 100644
--- a/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
@@ -20,11 +20,12 @@ package org.apache.cassandra.gms;
import static org.junit.Assert.*;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
+
import java.net.InetAddress;
import org.apache.cassandra.net.MessagingService;
@@ -48,8 +49,8 @@ public class GossipDigestTest
DataOutputBuffer output = new DataOutputBuffer();
GossipDigest.serializer.serialize(expected, output, MessagingService.current_version);
- ByteArrayInputStream input = new ByteArrayInputStream(output.getData(), 0, output.getLength());
- GossipDigest actual = GossipDigest.serializer.deserialize(new DataInputStream(input), MessagingService.current_version);
+ DataInputPlus input = new NIODataInputStream(output.getData());
+ GossipDigest actual = GossipDigest.serializer.deserialize(input, MessagingService.current_version);
assertEquals(0, expected.compareTo(actual));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index 080ae53..bab1ace 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -20,12 +20,12 @@ package org.apache.cassandra.gms;
import org.apache.cassandra.AbstractSerializationsTester;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.Test;
-import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
@@ -58,7 +58,7 @@ public class SerializationsTest extends AbstractSerializationsTester
if (EXECUTE_WRITES)
testEndpointStateWrite();
- DataInputStream in = getInput("gms.EndpointState.bin");
+ DataInputStreamPlus in = getInput("gms.EndpointState.bin");
assert HeartBeatState.serializer.deserialize(in, getVersion()) != null;
assert EndpointState.serializer.deserialize(in, getVersion()) != null;
assert VersionedValue.serializer.deserialize(in, getVersion()) != null;
@@ -98,7 +98,7 @@ public class SerializationsTest extends AbstractSerializationsTester
testGossipDigestWrite();
int count = 0;
- DataInputStream in = getInput("gms.Gossip.bin");
+ DataInputStreamPlus in = getInput("gms.Gossip.bin");
while (count < Statics.Digests.size())
assert GossipDigestAck2.serializer.deserialize(in, getVersion()) != null;
assert GossipDigestAck.serializer.deserialize(in, getVersion()) != null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 9c8e0fb..b7af1be 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -25,12 +25,13 @@ import java.util.Collections;
import java.util.UUID;
import org.junit.Test;
-
import org.apache.cassandra.AbstractSerializationsTester;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -79,7 +80,7 @@ public class SerializationsTest extends AbstractSerializationsTester
if (EXECUTE_WRITES)
testValidationRequestWrite();
- try (DataInputStream in = getInput("service.ValidationRequest.bin"))
+ try (DataInputStreamPlus in = getInput("service.ValidationRequest.bin"))
{
RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
assert message.messageType == RepairMessage.Type.VALIDATION_REQUEST;
@@ -117,7 +118,7 @@ public class SerializationsTest extends AbstractSerializationsTester
if (EXECUTE_WRITES)
testValidationCompleteWrite();
- try (DataInputStream in = getInput("service.ValidationComplete.bin"))
+ try (DataInputStreamPlus in = getInput("service.ValidationComplete.bin"))
{
// empty validation
RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
@@ -169,7 +170,7 @@ public class SerializationsTest extends AbstractSerializationsTester
InetAddress src = InetAddress.getByAddress(new byte[]{127, 0, 0, 2});
InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3});
- try (DataInputStream in = getInput("service.SyncRequest.bin"))
+ try (DataInputStreamPlus in = getInput("service.SyncRequest.bin"))
{
RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
assert message.messageType == RepairMessage.Type.SYNC_REQUEST;
@@ -205,7 +206,7 @@ public class SerializationsTest extends AbstractSerializationsTester
InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3});
NodePair nodes = new NodePair(src, dest);
- try (DataInputStream in = getInput("service.SyncComplete.bin"))
+ try (DataInputStreamPlus in = getInput("service.SyncComplete.bin"))
{
// success
RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java b/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java
deleted file mode 100644
index a2cff63..0000000
--- a/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.io.*;
-
-import com.google.common.collect.Iterators;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.RowUpdateBuilder;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.vint.EncodedDataInputStream;
-import org.apache.cassandra.utils.vint.EncodedDataOutputStream;
-
-
-public class EncodedStreamsTest
-{
- private static final String KEYSPACE1 = "Keyspace1";
- private static final String CF_STANDARD = "Standard1";
- private static final String CF_COUNTER = "Counter1";
- private int version = MessagingService.current_version;
-
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD),
- SchemaLoader.counterCFMD(KEYSPACE1, CF_COUNTER));
- }
-
- @Test
- public void testStreams() throws IOException
- {
- ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream();
- EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1);
-
- ByteArrayOutputStream byteArrayOStream2 = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(byteArrayOStream2);
-
- for (short i = 0; i < 10000; i++)
- {
- out.writeShort(i);
- odos.writeShort(i);
- }
- out.flush();
- odos.flush();
-
- for (int i = Short.MAX_VALUE; i < ((int)Short.MAX_VALUE + 10000); i++)
- {
- out.writeInt(i);
- odos.writeInt(i);
- }
- out.flush();
- odos.flush();
-
- for (long i = Integer.MAX_VALUE; i < ((long)Integer.MAX_VALUE + 10000);i++)
- {
- out.writeLong(i);
- odos.writeLong(i);
- }
- out.flush();
- odos.flush();
- Assert.assertTrue(byteArrayOStream1.size() < byteArrayOStream2.size());
-
- ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray());
- EncodedDataInputStream idis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1));
-
- // assert reading Short
- for (int i = 0; i < 10000; i++)
- Assert.assertEquals(i, idis.readShort());
-
- // assert reading Integer
- for (int i = Short.MAX_VALUE; i < ((int)Short.MAX_VALUE + 10000); i++)
- Assert.assertEquals(i, idis.readInt());
-
- // assert reading Long
- for (long i = Integer.MAX_VALUE; i < ((long)Integer.MAX_VALUE) + 1000; i++)
- Assert.assertEquals(i, idis.readLong());
- }
-
- private UnfilteredRowIterator createTable()
- {
- CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD).metadata;
-
- RowUpdateBuilder builder = new RowUpdateBuilder(cfm, 0, "key");
-
- builder.clustering("vijay").add(cfm.partitionColumns().iterator().next(), "try").build();
- builder.clustering("to").add(cfm.partitionColumns().iterator().next(), "be_nice").build();
-
- return builder.unfilteredIterator();
- }
-
- private UnfilteredRowIterator createCounterTable()
- {
- CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER).metadata;
- RowUpdateBuilder builder = new RowUpdateBuilder(cfm, 0, "key");
-
- builder.clustering("vijay").add(cfm.partitionColumns().iterator().next(), 1L).build();
- builder.clustering("wants").add(cfm.partitionColumns().iterator().next(), 1000000L).build();
-
- return builder.unfilteredIterator();
- }
-
- @Test
- public void testCFSerialization() throws IOException
- {
- ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream();
- EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1);
- UnfilteredRowIteratorSerializer.serializer.serialize(createTable(), odos, version, 1);
-
- ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray());
- EncodedDataInputStream odis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1));
- UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(odis, version, SerializationHelper.Flag.LOCAL);
- Assert.assertTrue(Iterators.elementsEqual(partition, createTable()));
- Assert.assertEquals(byteArrayOStream1.size(), (int) UnfilteredRowIteratorSerializer.serializer.serializedSize(createTable(), version, 1, TypeSizes.VINT));
- }
-
- @Test
- public void testCounterCFSerialization() throws IOException
- {
- ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream();
- EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1);
- UnfilteredRowIteratorSerializer.serializer.serialize(createCounterTable(), odos, version, 1);
-
- ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray());
- EncodedDataInputStream odis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1));
- UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(odis, version, SerializationHelper.Flag.LOCAL);
- Assert.assertTrue(Iterators.elementsEqual(partition, createCounterTable()));
- Assert.assertEquals(byteArrayOStream1.size(), (int) UnfilteredRowIteratorSerializer.serializer.serializedSize(createCounterTable(), version, 1, TypeSizes.VINT));
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
index 01d7bd8..bc23f14 100644
--- a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
@@ -21,9 +21,6 @@ package org.apache.cassandra.utils;
*/
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
@@ -31,12 +28,12 @@ import java.util.Collections;
import java.util.List;
import org.junit.Test;
-
-import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.NIODataInputStream;
import static org.junit.Assert.assertEquals;
@@ -155,12 +152,12 @@ public class IntervalTreeTest
out.writeInt(i);
}
- public Integer deserialize(DataInput in) throws IOException
+ public Integer deserialize(DataInputPlus in) throws IOException
{
return in.readInt();
}
- public long serializedSize(Integer i, TypeSizes s)
+ public long serializedSize(Integer i)
{
return 4;
}
@@ -172,12 +169,12 @@ public class IntervalTreeTest
out.writeUTF(v);
}
- public String deserialize(DataInput in) throws IOException
+ public String deserialize(DataInputPlus in) throws IOException
{
return in.readUTF();
}
- public long serializedSize(String v, TypeSizes s)
+ public long serializedSize(String v)
{
return v.length();
}
@@ -189,7 +186,7 @@ public class IntervalTreeTest
serializer.serialize(it, out, 0);
- DataInputStream in = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
+ DataInputPlus in = new NIODataInputStream(out.toByteArray());
IntervalTree<Integer, String, Interval<Integer, String>> it2 = serializer.deserialize(in, 0);
List<Interval<Integer, String>> intervals2 = new ArrayList<Interval<Integer, String>>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index fe7f506..03c906c 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@ -22,18 +22,18 @@ import java.math.BigInteger;
import java.util.*;
import com.google.common.collect.AbstractIterator;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteStreams;
+
import org.junit.Before;
import org.junit.Test;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.MerkleTree.Hashable;
import org.apache.cassandra.utils.MerkleTree.RowHash;
@@ -400,7 +400,7 @@ public class MerkleTreeTest
MerkleTree.serializer.serialize(mt, out, MessagingService.current_version);
byte[] serialized = out.toByteArray();
- ByteArrayDataInput in = ByteStreams.newDataInput(serialized);
+ DataInputPlus in = new NIODataInputStream(serialized);
MerkleTree restored = MerkleTree.serializer.deserialize(in, MessagingService.current_version);
assertHashEquals(initialhash, restored.hash(full));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
index 0775246..f3809b3 100644
--- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
@@ -23,9 +23,9 @@ import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
-
import org.apache.cassandra.AbstractSerializationsTester;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.service.StorageService;
@@ -87,7 +87,7 @@ public class SerializationsTest extends AbstractSerializationsTester
if (EXECUTE_WRITES)
testEstimatedHistogramWrite();
- try (DataInputStream in = getInput("utils.EstimatedHistogram.bin"))
+ try (DataInputStreamPlus in = getInput("utils.EstimatedHistogram.bin"))
{
Assert.assertNotNull(EstimatedHistogram.serializer.deserialize(in));
Assert.assertNotNull(EstimatedHistogram.serializer.deserialize(in));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
index 38b2f04..0ea25da 100644
--- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
+++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
@@ -24,8 +24,8 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.junit.Test;
-
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.NIODataInputStream;
import static org.junit.Assert.assertEquals;
@@ -103,7 +103,7 @@ public class StreamingHistogramTest
StreamingHistogram.serializer.serialize(hist, out);
byte[] bytes = out.toByteArray();
- StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
+ StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new NIODataInputStream(bytes));
// deserialized histogram should have following values
Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);