You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2022/02/02 12:13:26 UTC

[cassandra] branch trunk updated: CASSANDRA-15215 Use DataOutputPlus.writeBytes in VIntCoding.writeUnsignedVInt

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 35dbcc2  CASSANDRA-15215 Use DataOutputPlus.writeBytes in VIntCoding.writeUnsignedVInt
35dbcc2 is described below

commit 35dbcc2c2dbe1c826fd6ecd6e8357f0f5a9bab02
Author: Aleksandr Sorokoumov <al...@gmail.com>
AuthorDate: Sun Oct 31 16:48:52 2021 +0100

    CASSANDRA-15215 Use DataOutputPlus.writeBytes in VIntCoding.writeUnsignedVInt
    
    In the cases where VInt occupies less than or equal to 8 bytes
    and the underlying buffer has at least 8 bytes, VIntCoding writes the
    entire register in a single operation and then adjusts the buffer position.
    
    Co-authored-by: Benedict Elliott Smith <be...@apache.org>
    Co-authored-by: Branimir Lambov <br...@datastax.com>
---
 .../cassandra/cql3/functions/types/TypeCodec.java  |  12 +-
 .../io/util/BufferedDataOutputStreamPlus.java      |  19 +-
 .../apache/cassandra/io/util/DataOutputPlus.java   |  48 +++
 .../apache/cassandra/utils/vint/VIntCoding.java    |  91 +++---
 .../cassandra/test/microbench/VIntCodingBench.java | 353 +++++++++++++++++++++
 .../io/util/BufferedDataOutputStreamTest.java      |  17 +
 .../apache/cassandra/io/util/DataOutputTest.java   |  32 +-
 .../cassandra/utils/vint/VIntCodingTest.java       |  79 ++++-
 8 files changed, 589 insertions(+), 62 deletions(-)

diff --git a/src/java/org/apache/cassandra/cql3/functions/types/TypeCodec.java b/src/java/org/apache/cassandra/cql3/functions/types/TypeCodec.java
index 2c15a25..dc34bca 100644
--- a/src/java/org/apache/cassandra/cql3/functions/types/TypeCodec.java
+++ b/src/java/org/apache/cassandra/cql3/functions/types/TypeCodec.java
@@ -31,7 +31,6 @@ import java.text.ParseException;
 import java.util.*;
 import java.util.regex.Pattern;
 
-import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
 import com.google.common.reflect.TypeToken;
 
@@ -3043,19 +3042,20 @@ public abstract class TypeCodec<T>
             VIntCoding.computeVIntSize(months)
             + VIntCoding.computeVIntSize(days)
             + VIntCoding.computeVIntSize(nanoseconds);
-            ByteArrayDataOutput out = ByteStreams.newDataOutput(size);
+            ByteBuffer bb = ByteBuffer.allocate(size);
             try
             {
-                VIntCoding.writeVInt(months, out);
-                VIntCoding.writeVInt(days, out);
-                VIntCoding.writeVInt(nanoseconds, out);
+                VIntCoding.writeVInt(months, bb);
+                VIntCoding.writeVInt(days, bb);
+                VIntCoding.writeVInt(nanoseconds, bb);
             }
             catch (IOException e)
             {
                 // cannot happen
                 throw new AssertionError();
             }
-            return ByteBuffer.wrap(out.toByteArray());
+            bb.flip();
+            return bb;
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index e56b7b0..4e9bbb5 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.WritableByteChannel;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import net.nicoulaj.compilecommand.annotations.DontInline;
@@ -52,7 +53,8 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
         Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accommodate a long/double");
     }
 
-    protected BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer)
+    @VisibleForTesting
+    public BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer)
     {
         super(channel);
         this.buffer = buffer;
@@ -146,6 +148,21 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
     }
 
     @Override
+    public void writeBytes(long register, int bytes) throws IOException
+    {
+        if (buffer.remaining() < Long.BYTES)
+        {
+            super.writeBytes(register, bytes);
+        }
+        else
+        {
+            int pos = buffer.position();
+            buffer.putLong(pos, register);
+            buffer.position(pos + bytes);
+        }
+    }
+
+    @Override
     public void writeShort(int v) throws IOException
     {
         writeChar(v);
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index a8f545e..205dab7 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -60,6 +60,54 @@ public interface DataOutputPlus extends DataOutput
     }
 
     /**
+     * An efficient way to write the type {@code bytes} of a long
+     *
+     * @param register - the long value to be written
+     * @param bytes - the number of bytes the register occupies. Valid values are between 1 and 8 inclusive.
+     * @throws IOException
+     */
+    default void writeBytes(long register, int bytes) throws IOException
+    {
+        switch (bytes)
+        {
+            case 0:
+                break;
+            case 1:
+                writeByte((int)(register >>> 56));
+                break;
+            case 2:
+                writeShort((int)(register >> 48));
+                break;
+            case 3:
+                writeShort((int)(register >> 48));
+                writeByte((int)(register >> 40));
+                break;
+            case 4:
+                writeInt((int)(register >> 32));
+                break;
+            case 5:
+                writeInt((int)(register >> 32));
+                writeByte((int)(register >> 24));
+                break;
+            case 6:
+                writeInt((int)(register >> 32));
+                writeShort((int)(register >> 16));
+                break;
+            case 7:
+                writeInt((int)(register >> 32));
+                writeShort((int)(register >> 16));
+                writeByte((int)(register >> 8));
+                break;
+            case 8:
+                writeLong(register);
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+
+    }
+
+    /**
      * Returns the current position of the underlying target like a file-pointer
      * or the position withing a buffer. Not every implementation may support this
      * functionality. Whether or not this functionality is supported can be checked
diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
index 6961d9f..33def08 100644
--- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
+++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
@@ -47,13 +47,12 @@
 package org.apache.cassandra.utils.vint;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import io.netty.util.concurrent.FastThreadLocal;
 import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
  * Borrows idea from
@@ -173,26 +172,31 @@ public class VIntCoding
         return Integer.numberOfLeadingZeros(~firstByte) - 24;
     }
 
-    protected static final FastThreadLocal<byte[]> encodingBuffer = new FastThreadLocal<byte[]>()
-    {
-        @Override
-        public byte[] initialValue()
-        {
-            return new byte[9];
-        }
-    };
-
     @Inline
-    public static void writeUnsignedVInt(long value, DataOutput output) throws IOException
+    public static void writeUnsignedVInt(long value, DataOutputPlus output) throws IOException
     {
         int size = VIntCoding.computeUnsignedVIntSize(value);
         if (size == 1)
         {
-            output.write((int)value);
-            return;
+            output.writeByte((int) value);
+        }
+        else if (size < 9)
+        {
+            int shift = (8 - size) << 3;
+            int extraBytes = size - 1;
+            long mask = (long)VIntCoding.encodeExtraBytesToRead(extraBytes) << 56;
+            long register = (value << shift) | mask;
+            output.writeBytes(register, size);
+        }
+        else if (size == 9)
+        {
+            output.write((byte) 0xFF);
+            output.writeLong(value);
+        }
+        else
+        {
+            throw new AssertionError();
         }
-
-        output.write(VIntCoding.encodeUnsignedVInt(value, size), 0, size);
     }
 
     @Inline
@@ -201,39 +205,41 @@ public class VIntCoding
         int size = VIntCoding.computeUnsignedVIntSize(value);
         if (size == 1)
         {
-            output.put((byte) value);
-            return;
+            output.put((byte) (value));
+        }
+        else if (size < 9)
+        {
+            int limit = output.limit();
+            int pos = output.position();
+            if (limit - pos >= size)
+            {
+                int shift = (8 - size) << 3;
+                int extraBytes = size - 1;
+                long mask = (long)VIntCoding.encodeExtraBytesToRead(extraBytes) << 56;
+                long register = (value << shift) | mask;
+                output.putLong(pos, register);
+                output.position(pos + size);
+            }
+        }
+        else if (size == 9)
+        {
+            output.put((byte) 0xFF);
+            output.putLong(value);
+        }
+        else
+        {
+            throw new AssertionError();
         }
-
-        output.put(VIntCoding.encodeUnsignedVInt(value, size), 0, size);
-    }
-
-    /**
-     * @return a TEMPORARY THREAD LOCAL BUFFER containing the encoded bytes of the value
-     * This byte[] must be discarded by the caller immediately, and synchronously
-     */
-    @Inline
-    private static byte[] encodeUnsignedVInt(long value, int size)
-    {
-        byte[] encodingSpace = encodingBuffer.get();
-        encodeUnsignedVInt(value, size, encodingSpace);
-        return encodingSpace;
     }
 
     @Inline
-    private static void encodeUnsignedVInt(long value, int size, byte[] encodeInto)
+    public static void writeVInt(long value, DataOutputPlus output) throws IOException
     {
-        int extraBytes = size - 1;
-        for (int i = extraBytes ; i >= 0; --i)
-        {
-            encodeInto[i] = (byte) value;
-            value >>= 8;
-        }
-        encodeInto[0] |= VIntCoding.encodeExtraBytesToRead(extraBytes);
+        writeUnsignedVInt(encodeZigZag64(value), output);
     }
 
     @Inline
-    public static void writeVInt(long value, DataOutput output) throws IOException
+    public static void writeVInt(long value, ByteBuffer output) throws IOException
     {
         writeUnsignedVInt(encodeZigZag64(value), output);
     }
@@ -279,6 +285,7 @@ public class VIntCoding
     public static int computeUnsignedVIntSize(final long value)
     {
         int magnitude = Long.numberOfLeadingZeros(value | 1); // | with 1 to ensure magntiude <= 63, so (63 - 1) / 7 <= 8
-        return 9 - ((magnitude - 1) / 7);
+        // the formula below is hand-picked to match the original 9 - ((magnitude - 1) / 7)
+        return (639 - magnitude * 9) >> 6;
     }
 }
diff --git a/test/microbench/org/apache/cassandra/test/microbench/VIntCodingBench.java b/test/microbench/org/apache/cassandra/test/microbench/VIntCodingBench.java
new file mode 100644
index 0000000..9c82236
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/VIntCodingBench.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.PrimitiveIterator;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.vint.VIntCoding;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 3, jvmArgsAppend = "-Xmx512M")
+@Threads(1)
+@State(Scope.Benchmark)
+public class VIntCodingBench
+{
+    long oneByte = 53;
+    long twoBytes = 10201;
+    long threeBytes = 1097151L;
+    long fourBytes = 168435455L;
+    long fiveBytes = 33251130335L;
+    long sixBytes = 3281283447775L;
+    long sevenBytes = 417672546086779L;
+    long eightBytes = 52057592037927932L;
+    long nineBytes = 72057594037927937L;
+
+    final static String MONOMORPHIC = "monomorphic";
+    final static String BIMORPHIC = "bimorphic";
+    final static String MEGAMORPHIC = "megamorphic";
+
+    @Param({ MONOMORPHIC, BIMORPHIC, MEGAMORPHIC})
+    String allocation;
+
+    final Random random = new Random(100);
+    final PrimitiveIterator.OfLong longs = random.longs().iterator();
+    final static int BUFFER_SIZE = 8192;
+
+    ByteBuffer onheap = ByteBuffer.allocate(BUFFER_SIZE);
+    ByteBuffer direct = ByteBuffer.allocateDirect(BUFFER_SIZE);
+    File mmapedFile = new File(VIntCodingBench.class + "_mmap");
+    ByteBuffer mmaped = allocateMmapedByteBuffer(mmapedFile, BUFFER_SIZE);
+
+    @TearDown
+    public void tearDown()
+    {
+        mmapedFile.delete();
+    }
+
+    private static ByteBuffer allocateMmapedByteBuffer(File mmapFile, int bufferSize)
+    {
+        try(RandomAccessFile file = new RandomAccessFile(mmapFile, "rw");
+            FileChannel ch = file.getChannel())
+        {
+            return ch.map(FileChannel.MapMode.READ_WRITE, 0, bufferSize);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private ByteBuffer getByteBuffer(String allocation)
+    {
+        ByteBuffer buffer;
+        if (allocation.equals(MONOMORPHIC))
+        {
+            buffer = onheap;
+        }
+        else if (allocation.equals(BIMORPHIC))
+        {
+            buffer = random.nextBoolean() ? onheap : direct;
+        }
+        else
+        {
+            switch(random.nextInt(3))
+            {
+                case 0:
+                    buffer = onheap;
+                    break;
+                case 1:
+                    buffer = direct;
+                    break;
+                default:
+                    buffer = mmaped;
+                    break;
+            }
+        }
+        return buffer;
+    }
+
+    private DataOutputPlus getBufferedDataOutput(Blackhole bh, ByteBuffer buffer)
+    {
+        WritableByteChannel wbc = new WritableByteChannel() {
+
+            @Override
+            public boolean isOpen()
+            {
+                return true;
+            }
+
+            @Override
+            public void close() throws IOException
+            {
+            }
+
+            @Override
+            public int write(ByteBuffer src) throws IOException
+            {
+                bh.consume(src);
+                int remaining = src.remaining();
+                src.position(src.limit());
+                return remaining;
+            }
+        };
+        return new BufferedDataOutputStreamPlus(wbc, buffer);
+    }
+
+    @Benchmark
+    public void testWrite1ByteBB(final Blackhole bh)
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        VIntCoding.writeUnsignedVInt(oneByte, buffer);
+        bh.consume(buffer);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite1ByteDOP(final Blackhole bh) throws IOException
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+        VIntCoding.writeUnsignedVInt(oneByte, out);
+        bh.consume(out);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite2BytesBB(final Blackhole bh)
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        VIntCoding.writeUnsignedVInt(twoBytes, buffer);
+        bh.consume(buffer);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite2BytesDOP(final Blackhole bh) throws IOException
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+        VIntCoding.writeUnsignedVInt(twoBytes, out);
+        bh.consume(out);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite3BytesBB(final Blackhole bh)
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        VIntCoding.writeUnsignedVInt(threeBytes, buffer);
+        bh.consume(buffer);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite3BytesDOP(final Blackhole bh) throws IOException
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+        VIntCoding.writeUnsignedVInt(threeBytes, out);
+        bh.consume(out);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite4BytesBB(final Blackhole bh)
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        VIntCoding.writeUnsignedVInt(fourBytes, buffer);
+        bh.consume(buffer);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite4BytesDOP(final Blackhole bh) throws IOException
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+        VIntCoding.writeUnsignedVInt(fourBytes, out);
+        bh.consume(out);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite5BytesBB(final Blackhole bh)
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        VIntCoding.writeUnsignedVInt(fiveBytes, buffer);
+        bh.consume(buffer);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite5BytesDOP(final Blackhole bh) throws IOException
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+        VIntCoding.writeUnsignedVInt(fiveBytes, out);
+        bh.consume(out);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite6BytesBB(final Blackhole bh)
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        VIntCoding.writeUnsignedVInt(sixBytes, buffer);
+        bh.consume(buffer);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite6BytesDOP(final Blackhole bh) throws IOException
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+        VIntCoding.writeUnsignedVInt(sixBytes, out);
+        bh.consume(out);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite7BytesBB(final Blackhole bh)
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        VIntCoding.writeUnsignedVInt(sevenBytes, buffer);
+        bh.consume(buffer);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite7BytesDOP(final Blackhole bh) throws IOException
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+        VIntCoding.writeUnsignedVInt(sevenBytes, out);
+        bh.consume(out);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite8BytesBB(final Blackhole bh)
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        VIntCoding.writeUnsignedVInt(eightBytes, buffer);
+        bh.consume(buffer);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite8BytesDOP(final Blackhole bh) throws IOException
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+        VIntCoding.writeUnsignedVInt(eightBytes, out);
+        bh.consume(out);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite9BytesBB(final Blackhole bh)
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        VIntCoding.writeUnsignedVInt(nineBytes, buffer);
+        bh.consume(buffer);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWrite9BytesDOP(final Blackhole bh) throws IOException
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+        VIntCoding.writeUnsignedVInt(nineBytes, out);
+        bh.consume(out);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWriteRandomLongBB(final Blackhole bh)
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        VIntCoding.writeUnsignedVInt(longs.nextLong(), buffer);
+        bh.consume(buffer);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testWriteRandomLongDOP(final Blackhole bh) throws IOException
+    {
+        ByteBuffer buffer = getByteBuffer(allocation);
+        DataOutputPlus out = getBufferedDataOutput(bh, buffer);
+        VIntCoding.writeUnsignedVInt(longs.nextLong(), out);
+        bh.consume(out);
+        buffer.clear();
+    }
+
+    @Benchmark
+    public void testComputeUnsignedVIntSize(final Blackhole bh)
+    {
+        bh.consume(VIntCoding.computeUnsignedVIntSize(longs.nextLong()));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index 66f506d..040a080 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -37,6 +37,7 @@ import java.nio.channels.WritableByteChannel;
 import java.util.Arrays;
 import java.util.Random;
 
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.vint.VIntCoding;
 import org.junit.Test;
 
@@ -616,4 +617,20 @@ public class BufferedDataOutputStreamTest
         }
     }
 
+    @Test
+    public void testWriteBytes() throws Exception
+    {
+        setUp();
+        DataOutputStreamPlus dosp = new BufferedDataOutputStreamPlus(adapter, 8);
+        for (int i = 0; i < 1000; i++)
+        {
+            long val = r.nextLong();
+            int size = r.nextInt(9);
+            byte[] bytes = ByteBufferUtil.bytes(val).array();
+            canonical.write(bytes, 0, size);
+            dosp.writeBytes(val, size);
+        }
+        dosp.flush();
+        assertArrayEquals(canonical.toByteArray(), generated.toByteArray());
+    }
 }
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 5f3553b..41631af 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -29,6 +29,7 @@ import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.util.ArrayDeque;
+import java.util.Arrays;
 import java.util.Deque;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -105,7 +106,7 @@ public class DataOutputTest
     @Test
     public void testDataOutputDirectByteBuffer() throws IOException
     {
-        ByteBuffer buf = wrap(new byte[345], true);
+        ByteBuffer buf = wrap(new byte[381], true);
         BufferedDataOutputStreamPlus write = new BufferedDataOutputStreamPlus(null, buf.duplicate());
         DataInput canon = testWrite(write);
         DataInput test = new DataInputStream(new ByteArrayInputStream(ByteBufferUtil.getArray(buf)));
@@ -115,7 +116,7 @@ public class DataOutputTest
     @Test
     public void testDataOutputHeapByteBuffer() throws IOException
     {
-        ByteBuffer buf = wrap(new byte[345], false);
+        ByteBuffer buf = wrap(new byte[381], false);
         BufferedDataOutputStreamPlus write = new BufferedDataOutputStreamPlus(null, buf.duplicate());
         DataInput canon = testWrite(write);
         DataInput test = new DataInputStream(new ByteArrayInputStream(ByteBufferUtil.getArray(buf)));
@@ -206,11 +207,11 @@ public class DataOutputTest
             checkThrowsException(validateReallocationCallable(write, DataOutputBuffer.MAX_ARRAY_SIZE + 1),
                                  BufferOverflowException.class);
             //Check that it does throw
-            checkThrowsException(() -> 
+            checkThrowsException(() ->
                                  {
                                      write.write(42);
                                      return null;
-                                 }, 
+                                 },
                                  BufferOverflowException.class);
         }
     }
@@ -310,8 +311,8 @@ public class DataOutputTest
         try (SafeMemoryWriter write = new SafeMemoryWriter(10))
         {
             DataInput canon = testWrite(write);
-            byte[] bytes = new byte[345];
-            write.currentBuffer().getBytes(0, bytes, 0, 345);
+            byte[] bytes = new byte[381];
+            write.currentBuffer().getBytes(0, bytes, 0, 381);
             DataInput test = new DataInputStream(new ByteArrayInputStream(bytes));
             testRead(test, canon);
         }
@@ -460,6 +461,21 @@ public class DataOutputTest
             canon.writeFloat(v);
         }
 
+        byte[] rndBytes = new byte[Long.BYTES];
+        for (int i = 1; i <= Long.BYTES; i++)
+        {
+            Arrays.fill(rndBytes, 0, rndBytes.length, (byte) 0);
+            rnd.nextBytes(rndBytes);
+            // keep only first i random bytes
+            Arrays.fill(rndBytes,  i, rndBytes.length, (byte) 0);
+            long val = ByteBufferUtil.toLong(ByteBuffer.wrap(rndBytes));
+            test.writeBytes(val, i);
+            byte[] arr = new byte[i];
+            System.arraycopy(rndBytes, 0, arr, 0, i);
+            canon.write(arr);
+        }
+
+
         // 27
         return new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
     }
@@ -478,6 +494,10 @@ public class DataOutputTest
         assert test.readByte() == canon.readByte();
         assert test.readDouble() == canon.readDouble();
         assert test.readFloat() == canon.readFloat();
+        for (int i = 1; i <= Long.BYTES; i++)
+        {
+            Assert.assertArrayEquals(ByteBufferUtil.readBytes(canon, i), ByteBufferUtil.readBytes(test, i));
+        }
         try
         {
             test.readInt();
diff --git a/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java b/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java
index 7f1863e..bb85b94 100644
--- a/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java
+++ b/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java
@@ -18,11 +18,15 @@
 */
 package org.apache.cassandra.utils.vint;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
 
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
 
 import org.junit.Test;
 
@@ -30,6 +34,9 @@ import org.junit.Assert;
 
 public class VIntCodingTest
 {
+    private static final long[] LONGS = new long[] {53L, 10201L, 1097151L,
+                                                    168435455L, 33251130335L, 3281283447775L,
+                                                    417672546086779L, 52057592037927932L, 72057594037927937L};
 
     @Test
     public void testComputeSize() throws Exception
@@ -48,9 +55,9 @@ public class VIntCodingTest
     {
         Assert.assertEquals(expectedSize, VIntCoding.computeUnsignedVIntSize(value));
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        VIntCoding.writeUnsignedVInt(value, dos);
-        dos.flush();
+        WrappedDataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos);
+        VIntCoding.writeUnsignedVInt(value, out);
+        out.flush();
         Assert.assertEquals( expectedSize, baos.toByteArray().length);
 
         DataOutputBuffer dob = new DataOutputBuffer();
@@ -74,9 +81,9 @@ public class VIntCodingTest
         int biggestOneByte = 127;
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        VIntCoding.writeUnsignedVInt(biggestOneByte, dos);
-        dos.flush();
+        WrappedDataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos);
+        VIntCoding.writeUnsignedVInt(biggestOneByte, out);
+        out.flush();
         Assert.assertEquals( 1, baos.toByteArray().length);
 
         DataOutputBuffer dob = new DataOutputBuffer();
@@ -96,4 +103,62 @@ public class VIntCodingTest
             Assert.assertEquals(i, result);
         }
     }
+
+    @Test
+    public void testWriteUnsignedVIntBufferedDOP() throws IOException
+    {
+        for (int i = 0; i < VIntCoding.MAX_SIZE - 1; i++)
+        {
+            long val = LONGS[i];
+            Assert.assertEquals(i + 1, VIntCoding.computeUnsignedVIntSize(val));
+            try (DataOutputBuffer out = new DataOutputBuffer())
+            {
+                VIntCoding.writeUnsignedVInt(val, out);
+                // read as ByteBuffer
+                Assert.assertEquals(val, VIntCoding.getUnsignedVInt(out.buffer(), 0));
+                // read as DataInput
+                InputStream is = new ByteArrayInputStream(out.toByteArray());
+                Assert.assertEquals(val, VIntCoding.readUnsignedVInt(new DataInputPlus.DataInputStreamPlus(is)));
+            }
+        }
+    }
+
+    @Test
+    public void testWriteUnsignedVIntUnbufferedDOP() throws IOException
+    {
+        for (int i = 0; i < VIntCoding.MAX_SIZE - 1; i++)
+        {
+            long val = LONGS[i];
+            Assert.assertEquals(i + 1, VIntCoding.computeUnsignedVIntSize(val));
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            try (WrappedDataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
+            {
+                VIntCoding.writeUnsignedVInt(val, out);
+                out.flush();
+                Assert.assertEquals( i + 1, baos.toByteArray().length);
+                // read as ByteBuffer
+                Assert.assertEquals(val, VIntCoding.getUnsignedVInt(ByteBuffer.wrap(baos.toByteArray()), 0));
+                // read as DataInput
+                InputStream is = new ByteArrayInputStream(baos.toByteArray());
+                Assert.assertEquals(val, VIntCoding.readUnsignedVInt(new DataInputPlus.DataInputStreamPlus(is)));
+            }
+        }
+    }
+
+    @Test
+    public void testWriteUnsignedVIntBB() throws IOException
+    {
+        for (int i = 0; i < VIntCoding.MAX_SIZE - 1; i++)
+        {
+            long val = LONGS[i];
+            Assert.assertEquals(i + 1, VIntCoding.computeUnsignedVIntSize(val));
+            ByteBuffer bb = ByteBuffer.allocate(VIntCoding.MAX_SIZE);
+            VIntCoding.writeUnsignedVInt(val, bb);
+            // read as ByteBuffer
+            Assert.assertEquals(val, VIntCoding.getUnsignedVInt(bb, 0));
+            // read as DataInput
+            InputStream is = new ByteArrayInputStream(bb.array());
+            Assert.assertEquals(val, VIntCoding.readUnsignedVInt(new DataInputPlus.DataInputStreamPlus(is)));
+        }
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org