You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2022/02/02 12:13:26 UTC
[cassandra] branch trunk updated: CASSANDRA-15215 Use DataOutputPlus.writeBytes in VIntCoding.writeUnsignedVInt
This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 35dbcc2 CASSANDRA-15215 Use DataOutputPlus.writeBytes in VIntCoding.writeUnsignedVInt
35dbcc2 is described below
commit 35dbcc2c2dbe1c826fd6ecd6e8357f0f5a9bab02
Author: Aleksandr Sorokoumov <al...@gmail.com>
AuthorDate: Sun Oct 31 16:48:52 2021 +0100
CASSANDRA-15215 Use DataOutputPlus.writeBytes in VIntCoding.writeUnsignedVInt
In the cases where VInt occupies less than or equal to 8 bytes
and the underlying buffer has at least 8 bytes, VIntCoding writes the
entire register in a single operation and then adjusts the buffer position.
Co-authored-by: Benedict Elliott Smith <be...@apache.org>
Co-authored-by: Branimir Lambov <br...@datastax.com>
---
.../cassandra/cql3/functions/types/TypeCodec.java | 12 +-
.../io/util/BufferedDataOutputStreamPlus.java | 19 +-
.../apache/cassandra/io/util/DataOutputPlus.java | 48 +++
.../apache/cassandra/utils/vint/VIntCoding.java | 91 +++---
.../cassandra/test/microbench/VIntCodingBench.java | 353 +++++++++++++++++++++
.../io/util/BufferedDataOutputStreamTest.java | 17 +
.../apache/cassandra/io/util/DataOutputTest.java | 32 +-
.../cassandra/utils/vint/VIntCodingTest.java | 79 ++++-
8 files changed, 589 insertions(+), 62 deletions(-)
diff --git a/src/java/org/apache/cassandra/cql3/functions/types/TypeCodec.java b/src/java/org/apache/cassandra/cql3/functions/types/TypeCodec.java
index 2c15a25..dc34bca 100644
--- a/src/java/org/apache/cassandra/cql3/functions/types/TypeCodec.java
+++ b/src/java/org/apache/cassandra/cql3/functions/types/TypeCodec.java
@@ -31,7 +31,6 @@ import java.text.ParseException;
import java.util.*;
import java.util.regex.Pattern;
-import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.common.reflect.TypeToken;
@@ -3043,19 +3042,20 @@ public abstract class TypeCodec<T>
VIntCoding.computeVIntSize(months)
+ VIntCoding.computeVIntSize(days)
+ VIntCoding.computeVIntSize(nanoseconds);
- ByteArrayDataOutput out = ByteStreams.newDataOutput(size);
+ ByteBuffer bb = ByteBuffer.allocate(size);
try
{
- VIntCoding.writeVInt(months, out);
- VIntCoding.writeVInt(days, out);
- VIntCoding.writeVInt(nanoseconds, out);
+ VIntCoding.writeVInt(months, bb);
+ VIntCoding.writeVInt(days, bb);
+ VIntCoding.writeVInt(nanoseconds, bb);
}
catch (IOException e)
{
// cannot happen
throw new AssertionError();
}
- return ByteBuffer.wrap(out.toByteArray());
+ bb.flip();
+ return bb;
}
@Override
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index e56b7b0..4e9bbb5 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import net.nicoulaj.compilecommand.annotations.DontInline;
@@ -52,7 +53,8 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accommodate a long/double");
}
- protected BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer)
+ @VisibleForTesting
+ public BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer)
{
super(channel);
this.buffer = buffer;
@@ -146,6 +148,21 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
}
@Override
+ public void writeBytes(long register, int bytes) throws IOException
+ {
+ if (buffer.remaining() < Long.BYTES)
+ {
+ super.writeBytes(register, bytes);
+ }
+ else
+ {
+ int pos = buffer.position();
+ buffer.putLong(pos, register);
+ buffer.position(pos + bytes);
+ }
+ }
+
+ @Override
public void writeShort(int v) throws IOException
{
writeChar(v);
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index a8f545e..205dab7 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -60,6 +60,54 @@ public interface DataOutputPlus extends DataOutput
}
/**
+ * An efficient way to write the type {@code bytes} of a long
+ *
+ * @param register - the long value to be written
+ * @param bytes - the number of bytes the register occupies. Valid values are between 1 and 8 inclusive.
+ * @throws IOException
+ */
+ default void writeBytes(long register, int bytes) throws IOException
+ {
+ switch (bytes)
+ {
+ case 0:
+ break;
+ case 1:
+ writeByte((int)(register >>> 56));
+ break;
+ case 2:
+ writeShort((int)(register >> 48));
+ break;
+ case 3:
+ writeShort((int)(register >> 48));
+ writeByte((int)(register >> 40));
+ break;
+ case 4:
+ writeInt((int)(register >> 32));
+ break;
+ case 5:
+ writeInt((int)(register >> 32));
+ writeByte((int)(register >> 24));
+ break;
+ case 6:
+ writeInt((int)(register >> 32));
+ writeShort((int)(register >> 16));
+ break;
+ case 7:
+ writeInt((int)(register >> 32));
+ writeShort((int)(register >> 16));
+ writeByte((int)(register >> 8));
+ break;
+ case 8:
+ writeLong(register);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+
+ }
+
+ /**
* Returns the current position of the underlying target like a file-pointer
* or the position withing a buffer. Not every implementation may support this
* functionality. Whether or not this functionality is supported can be checked
diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
index 6961d9f..33def08 100644
--- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
+++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
@@ -47,13 +47,12 @@
package org.apache.cassandra.utils.vint;
import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
-import io.netty.util.concurrent.FastThreadLocal;
import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
/**
* Borrows idea from
@@ -173,26 +172,31 @@ public class VIntCoding
return Integer.numberOfLeadingZeros(~firstByte) - 24;
}
- protected static final FastThreadLocal<byte[]> encodingBuffer = new FastThreadLocal<byte[]>()
- {
- @Override
- public byte[] initialValue()
- {
- return new byte[9];
- }
- };
-
@Inline
- public static void writeUnsignedVInt(long value, DataOutput output) throws IOException
+ public static void writeUnsignedVInt(long value, DataOutputPlus output) throws IOException
{
int size = VIntCoding.computeUnsignedVIntSize(value);
if (size == 1)
{
- output.write((int)value);
- return;
+ output.writeByte((int) value);
+ }
+ else if (size < 9)
+ {
+ int shift = (8 - size) << 3;
+ int extraBytes = size - 1;
+ long mask = (long)VIntCoding.encodeExtraBytesToRead(extraBytes) << 56;
+ long register = (value << shift) | mask;
+ output.writeBytes(register, size);
+ }
+ else if (size == 9)
+ {
+ output.write((byte) 0xFF);
+ output.writeLong(value);
+ }
+ else
+ {
+ throw new AssertionError();
}
-
- output.write(VIntCoding.encodeUnsignedVInt(value, size), 0, size);
}
@Inline
@@ -201,39 +205,41 @@ public class VIntCoding
int size = VIntCoding.computeUnsignedVIntSize(value);
if (size == 1)
{
- output.put((byte) value);
- return;
+ output.put((byte) (value));
+ }
+ else if (size < 9)
+ {
+ int limit = output.limit();
+ int pos = output.position();
+ if (limit - pos >= size)
+ {
+ int shift = (8 - size) << 3;
+ int extraBytes = size - 1;
+ long mask = (long)VIntCoding.encodeExtraBytesToRead(extraBytes) << 56;
+ long register = (value << shift) | mask;
+ output.putLong(pos, register);
+ output.position(pos + size);
+ }
+ }
+ else if (size == 9)
+ {
+ output.put((byte) 0xFF);
+ output.putLong(value);
+ }
+ else
+ {
+ throw new AssertionError();
}
-
- output.put(VIntCoding.encodeUnsignedVInt(value, size), 0, size);
- }
-
- /**
- * @return a TEMPORARY THREAD LOCAL BUFFER containing the encoded bytes of the value
- * This byte[] must be discarded by the caller immediately, and synchronously
- */
- @Inline
- private static byte[] encodeUnsignedVInt(long value, int size)
- {
- byte[] encodingSpace = encodingBuffer.get();
- encodeUnsignedVInt(value, size, encodingSpace);
- return encodingSpace;
}
@Inline
- private static void encodeUnsignedVInt(long value, int size, byte[] encodeInto)
+ public static void writeVInt(long value, DataOutputPlus output) throws IOException
{
- int extraBytes = size - 1;
- for (int i = extraBytes ; i >= 0; --i)
- {
- encodeInto[i] = (byte) value;
- value >>= 8;
- }
- encodeInto[0] |= VIntCoding.encodeExtraBytesToRead(extraBytes);
+ writeUnsignedVInt(encodeZigZag64(value), output);
}
@Inline
- public static void writeVInt(long value, DataOutput output) throws IOException
+ public static void writeVInt(long value, ByteBuffer output) throws IOException
{
writeUnsignedVInt(encodeZigZag64(value), output);
}
@@ -279,6 +285,7 @@ public class VIntCoding
public static int computeUnsignedVIntSize(final long value)
{
int magnitude = Long.numberOfLeadingZeros(value | 1); // | with 1 to ensure magntiude <= 63, so (63 - 1) / 7 <= 8
- return 9 - ((magnitude - 1) / 7);
+ // the formula below is hand-picked to match the original 9 - ((magnitude - 1) / 7)
+ return (639 - magnitude * 9) >> 6;
}
}
diff --git a/test/microbench/org/apache/cassandra/test/microbench/VIntCodingBench.java b/test/microbench/org/apache/cassandra/test/microbench/VIntCodingBench.java
new file mode 100644
index 0000000..9c82236
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/VIntCodingBench.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.PrimitiveIterator;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.vint.VIntCoding;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 3, jvmArgsAppend = "-Xmx512M")
+@Threads(1)
+@State(Scope.Benchmark)
+public class VIntCodingBench
+{
+ long oneByte = 53;
+ long twoBytes = 10201;
+ long threeBytes = 1097151L;
+ long fourBytes = 168435455L;
+ long fiveBytes = 33251130335L;
+ long sixBytes = 3281283447775L;
+ long sevenBytes = 417672546086779L;
+ long eightBytes = 52057592037927932L;
+ long nineBytes = 72057594037927937L;
+
+ final static String MONOMORPHIC = "monomorphic";
+ final static String BIMORPHIC = "bimorphic";
+ final static String MEGAMORPHIC = "megamorphic";
+
+ @Param({ MONOMORPHIC, BIMORPHIC, MEGAMORPHIC})
+ String allocation;
+
+ final Random random = new Random(100);
+ final PrimitiveIterator.OfLong longs = random.longs().iterator();
+ final static int BUFFER_SIZE = 8192;
+
+ ByteBuffer onheap = ByteBuffer.allocate(BUFFER_SIZE);
+ ByteBuffer direct = ByteBuffer.allocateDirect(BUFFER_SIZE);
+ File mmapedFile = new File(VIntCodingBench.class + "_mmap");
+ ByteBuffer mmaped = allocateMmapedByteBuffer(mmapedFile, BUFFER_SIZE);
+
+ @TearDown
+ public void tearDown()
+ {
+ mmapedFile.delete();
+ }
+
+ private static ByteBuffer allocateMmapedByteBuffer(File mmapFile, int bufferSize)
+ {
+ try(RandomAccessFile file = new RandomAccessFile(mmapFile, "rw");
+ FileChannel ch = file.getChannel())
+ {
+ return ch.map(FileChannel.MapMode.READ_WRITE, 0, bufferSize);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ByteBuffer getByteBuffer(String allocation)
+ {
+ ByteBuffer buffer;
+ if (allocation.equals(MONOMORPHIC))
+ {
+ buffer = onheap;
+ }
+ else if (allocation.equals(BIMORPHIC))
+ {
+ buffer = random.nextBoolean() ? onheap : direct;
+ }
+ else
+ {
+ switch(random.nextInt(3))
+ {
+ case 0:
+ buffer = onheap;
+ break;
+ case 1:
+ buffer = direct;
+ break;
+ default:
+ buffer = mmaped;
+ break;
+ }
+ }
+ return buffer;
+ }
+
+ private DataOutputPlus getBufferedDataOutput(Blackhole bh, ByteBuffer buffer)
+ {
+ WritableByteChannel wbc = new WritableByteChannel() {
+
+ @Override
+ public boolean isOpen()
+ {
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ bh.consume(src);
+ int remaining = src.remaining();
+ src.position(src.limit());
+ return remaining;
+ }
+ };
+ return new BufferedDataOutputStreamPlus(wbc, buffer);
+ }
+
+ @Benchmark
+ public void testWrite1ByteBB(final Blackhole bh)
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ VIntCoding.writeUnsignedVInt(oneByte, buffer);
+ bh.consume(buffer);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite1ByteDOP(final Blackhole bh) throws IOException
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+ VIntCoding.writeUnsignedVInt(oneByte, out);
+ bh.consume(out);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite2BytesBB(final Blackhole bh)
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ VIntCoding.writeUnsignedVInt(twoBytes, buffer);
+ bh.consume(buffer);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite2BytesDOP(final Blackhole bh) throws IOException
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+ VIntCoding.writeUnsignedVInt(twoBytes, out);
+ bh.consume(out);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite3BytesBB(final Blackhole bh)
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ VIntCoding.writeUnsignedVInt(threeBytes, buffer);
+ bh.consume(buffer);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite3BytesDOP(final Blackhole bh) throws IOException
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+ VIntCoding.writeUnsignedVInt(threeBytes, out);
+ bh.consume(out);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite4BytesBB(final Blackhole bh)
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ VIntCoding.writeUnsignedVInt(fourBytes, buffer);
+ bh.consume(buffer);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite4BytesDOP(final Blackhole bh) throws IOException
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+ VIntCoding.writeUnsignedVInt(fourBytes, out);
+ bh.consume(out);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite5BytesBB(final Blackhole bh)
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ VIntCoding.writeUnsignedVInt(fiveBytes, buffer);
+ bh.consume(buffer);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite5BytesDOP(final Blackhole bh) throws IOException
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+ VIntCoding.writeUnsignedVInt(fiveBytes, out);
+ bh.consume(out);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite6BytesBB(final Blackhole bh)
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ VIntCoding.writeUnsignedVInt(sixBytes, buffer);
+ bh.consume(buffer);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite6BytesDOP(final Blackhole bh) throws IOException
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+ VIntCoding.writeUnsignedVInt(sixBytes, out);
+ bh.consume(out);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite7BytesBB(final Blackhole bh)
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ VIntCoding.writeUnsignedVInt(sevenBytes, buffer);
+ bh.consume(buffer);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite7BytesDOP(final Blackhole bh) throws IOException
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+ VIntCoding.writeUnsignedVInt(sevenBytes, out);
+ bh.consume(out);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite8BytesBB(final Blackhole bh)
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ VIntCoding.writeUnsignedVInt(eightBytes, buffer);
+ bh.consume(buffer);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite8BytesDOP(final Blackhole bh) throws IOException
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+ VIntCoding.writeUnsignedVInt(eightBytes, out);
+ bh.consume(out);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite9BytesBB(final Blackhole bh)
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ VIntCoding.writeUnsignedVInt(nineBytes, buffer);
+ bh.consume(buffer);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWrite9BytesDOP(final Blackhole bh) throws IOException
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+ VIntCoding.writeUnsignedVInt(nineBytes, out);
+ bh.consume(out);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWriteRandomLongBB(final Blackhole bh)
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ VIntCoding.writeUnsignedVInt(longs.nextLong(), buffer);
+ bh.consume(buffer);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testWriteRandomLongDOP(final Blackhole bh) throws IOException
+ {
+ ByteBuffer buffer = getByteBuffer(allocation);
+ DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+ VIntCoding.writeUnsignedVInt(longs.nextLong(), out);
+ bh.consume(out);
+ buffer.clear();
+ }
+
+ @Benchmark
+ public void testComputeUnsignedVIntSize(final Blackhole bh)
+ {
+ bh.consume(VIntCoding.computeUnsignedVIntSize(longs.nextLong()));
+ }
+}
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index 66f506d..040a080 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -37,6 +37,7 @@ import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Random;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.vint.VIntCoding;
import org.junit.Test;
@@ -616,4 +617,20 @@ public class BufferedDataOutputStreamTest
}
}
+ @Test
+ public void testWriteBytes() throws Exception
+ {
+ setUp();
+ DataOutputStreamPlus dosp = new BufferedDataOutputStreamPlus(adapter, 8);
+ for (int i = 0; i < 1000; i++)
+ {
+ long val = r.nextLong();
+ int size = r.nextInt(9);
+ byte[] bytes = ByteBufferUtil.bytes(val).array();
+ canonical.write(bytes, 0, size);
+ dosp.writeBytes(val, size);
+ }
+ dosp.flush();
+ assertArrayEquals(canonical.toByteArray(), generated.toByteArray());
+ }
}
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 5f3553b..41631af 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -29,6 +29,7 @@ import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.ArrayDeque;
+import java.util.Arrays;
import java.util.Deque;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -105,7 +106,7 @@ public class DataOutputTest
@Test
public void testDataOutputDirectByteBuffer() throws IOException
{
- ByteBuffer buf = wrap(new byte[345], true);
+ ByteBuffer buf = wrap(new byte[381], true);
BufferedDataOutputStreamPlus write = new BufferedDataOutputStreamPlus(null, buf.duplicate());
DataInput canon = testWrite(write);
DataInput test = new DataInputStream(new ByteArrayInputStream(ByteBufferUtil.getArray(buf)));
@@ -115,7 +116,7 @@ public class DataOutputTest
@Test
public void testDataOutputHeapByteBuffer() throws IOException
{
- ByteBuffer buf = wrap(new byte[345], false);
+ ByteBuffer buf = wrap(new byte[381], false);
BufferedDataOutputStreamPlus write = new BufferedDataOutputStreamPlus(null, buf.duplicate());
DataInput canon = testWrite(write);
DataInput test = new DataInputStream(new ByteArrayInputStream(ByteBufferUtil.getArray(buf)));
@@ -206,11 +207,11 @@ public class DataOutputTest
checkThrowsException(validateReallocationCallable(write, DataOutputBuffer.MAX_ARRAY_SIZE + 1),
BufferOverflowException.class);
//Check that it does throw
- checkThrowsException(() ->
+ checkThrowsException(() ->
{
write.write(42);
return null;
- },
+ },
BufferOverflowException.class);
}
}
@@ -310,8 +311,8 @@ public class DataOutputTest
try (SafeMemoryWriter write = new SafeMemoryWriter(10))
{
DataInput canon = testWrite(write);
- byte[] bytes = new byte[345];
- write.currentBuffer().getBytes(0, bytes, 0, 345);
+ byte[] bytes = new byte[381];
+ write.currentBuffer().getBytes(0, bytes, 0, 381);
DataInput test = new DataInputStream(new ByteArrayInputStream(bytes));
testRead(test, canon);
}
@@ -460,6 +461,21 @@ public class DataOutputTest
canon.writeFloat(v);
}
+ byte[] rndBytes = new byte[Long.BYTES];
+ for (int i = 1; i <= Long.BYTES; i++)
+ {
+ Arrays.fill(rndBytes, 0, rndBytes.length, (byte) 0);
+ rnd.nextBytes(rndBytes);
+ // keep only first i random bytes
+ Arrays.fill(rndBytes, i, rndBytes.length, (byte) 0);
+ long val = ByteBufferUtil.toLong(ByteBuffer.wrap(rndBytes));
+ test.writeBytes(val, i);
+ byte[] arr = new byte[i];
+ System.arraycopy(rndBytes, 0, arr, 0, i);
+ canon.write(arr);
+ }
+
+
// 27
return new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
}
@@ -478,6 +494,10 @@ public class DataOutputTest
assert test.readByte() == canon.readByte();
assert test.readDouble() == canon.readDouble();
assert test.readFloat() == canon.readFloat();
+ for (int i = 1; i <= Long.BYTES; i++)
+ {
+ Assert.assertArrayEquals(ByteBufferUtil.readBytes(canon, i), ByteBufferUtil.readBytes(test, i));
+ }
try
{
test.readInt();
diff --git a/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java b/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java
index 7f1863e..bb85b94 100644
--- a/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java
+++ b/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java
@@ -18,11 +18,15 @@
*/
package org.apache.cassandra.utils.vint;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.junit.Test;
@@ -30,6 +34,9 @@ import org.junit.Assert;
public class VIntCodingTest
{
+ private static final long[] LONGS = new long[] {53L, 10201L, 1097151L,
+ 168435455L, 33251130335L, 3281283447775L,
+ 417672546086779L, 52057592037927932L, 72057594037927937L};
@Test
public void testComputeSize() throws Exception
@@ -48,9 +55,9 @@ public class VIntCodingTest
{
Assert.assertEquals(expectedSize, VIntCoding.computeUnsignedVIntSize(value));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- VIntCoding.writeUnsignedVInt(value, dos);
- dos.flush();
+ WrappedDataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos);
+ VIntCoding.writeUnsignedVInt(value, out);
+ out.flush();
Assert.assertEquals( expectedSize, baos.toByteArray().length);
DataOutputBuffer dob = new DataOutputBuffer();
@@ -74,9 +81,9 @@ public class VIntCodingTest
int biggestOneByte = 127;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- VIntCoding.writeUnsignedVInt(biggestOneByte, dos);
- dos.flush();
+ WrappedDataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos);
+ VIntCoding.writeUnsignedVInt(biggestOneByte, out);
+ out.flush();
Assert.assertEquals( 1, baos.toByteArray().length);
DataOutputBuffer dob = new DataOutputBuffer();
@@ -96,4 +103,62 @@ public class VIntCodingTest
Assert.assertEquals(i, result);
}
}
+
+ @Test
+ public void testWriteUnsignedVIntBufferedDOP() throws IOException
+ {
+ for (int i = 0; i < VIntCoding.MAX_SIZE - 1; i++)
+ {
+ long val = LONGS[i];
+ Assert.assertEquals(i + 1, VIntCoding.computeUnsignedVIntSize(val));
+ try (DataOutputBuffer out = new DataOutputBuffer())
+ {
+ VIntCoding.writeUnsignedVInt(val, out);
+ // read as ByteBuffer
+ Assert.assertEquals(val, VIntCoding.getUnsignedVInt(out.buffer(), 0));
+ // read as DataInput
+ InputStream is = new ByteArrayInputStream(out.toByteArray());
+ Assert.assertEquals(val, VIntCoding.readUnsignedVInt(new DataInputPlus.DataInputStreamPlus(is)));
+ }
+ }
+ }
+
+ @Test
+ public void testWriteUnsignedVIntUnbufferedDOP() throws IOException
+ {
+ for (int i = 0; i < VIntCoding.MAX_SIZE - 1; i++)
+ {
+ long val = LONGS[i];
+ Assert.assertEquals(i + 1, VIntCoding.computeUnsignedVIntSize(val));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (WrappedDataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
+ {
+ VIntCoding.writeUnsignedVInt(val, out);
+ out.flush();
+ Assert.assertEquals( i + 1, baos.toByteArray().length);
+ // read as ByteBuffer
+ Assert.assertEquals(val, VIntCoding.getUnsignedVInt(ByteBuffer.wrap(baos.toByteArray()), 0));
+ // read as DataInput
+ InputStream is = new ByteArrayInputStream(baos.toByteArray());
+ Assert.assertEquals(val, VIntCoding.readUnsignedVInt(new DataInputPlus.DataInputStreamPlus(is)));
+ }
+ }
+ }
+
+ @Test
+ public void testWriteUnsignedVIntBB() throws IOException
+ {
+ for (int i = 0; i < VIntCoding.MAX_SIZE - 1; i++)
+ {
+ long val = LONGS[i];
+ Assert.assertEquals(i + 1, VIntCoding.computeUnsignedVIntSize(val));
+ ByteBuffer bb = ByteBuffer.allocate(VIntCoding.MAX_SIZE);
+ VIntCoding.writeUnsignedVInt(val, bb);
+ // read as ByteBuffer
+ Assert.assertEquals(val, VIntCoding.getUnsignedVInt(bb, 0));
+ // read as DataInput
+ InputStream is = new ByteArrayInputStream(bb.array());
+ Assert.assertEquals(val, VIntCoding.readUnsignedVInt(new DataInputPlus.DataInputStreamPlus(is)));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org