You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/11 17:43:06 UTC

[bookkeeper] branch branch-4.7 updated: [TABLE SERVICE] Fix StringUtf8Coder and add VarIntCoder

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new d860011  [TABLE SERVICE] Fix StringUtf8Coder and add VarIntCoder
d860011 is described below

commit d8600116734b1bd271719cd90f81c5bb04d3ab02
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jun 11 10:41:31 2018 -0700

    [TABLE SERVICE] Fix StringUtf8Coder and add VarIntCoder
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    The getSerializeLen() returned by StringUtf8Coder is different from the serialized buffer size.
    
    *Changes*
    
    - Fix StringUtf8Coder
    - Add tests to coders
    - Add a VarIntCoder
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1507 from sijie/fix_string_coder
    
    (cherry picked from commit 42f6ea63ce6cd00cc699ff54272437aebbb985ca)
    Signed-off-by: Sijie Guo <si...@apache.org>
---
 .../org/apache/bookkeeper/common/coder/Coder.java  |  14 +-
 .../bookkeeper/common/coder/StringUtf8Coder.java   |  12 +-
 .../bookkeeper/common/coder/VarIntCoder.java       |  77 ++++++
 .../org/apache/bookkeeper/common/util/VarInt.java  | 138 ++++++++++
 .../common/coder/CoderBasicTestCase.java           |  51 ++++
 .../common/coder/TestByteArrayCoder.java           |  59 +++++
 .../common/coder/TestStringUtf8Coder.java          |  44 ++++
 .../bookkeeper/common/coder/TestVarIntCoder.java   |  43 ++++
 .../apache/bookkeeper/common/util/TestVarInt.java  | 277 +++++++++++++++++++++
 9 files changed, 706 insertions(+), 9 deletions(-)

diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/coder/Coder.java b/stream/common/src/main/java/org/apache/bookkeeper/common/coder/Coder.java
index dc1ea71..ee2b8db 100644
--- a/stream/common/src/main/java/org/apache/bookkeeper/common/coder/Coder.java
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/coder/Coder.java
@@ -37,7 +37,11 @@ public interface Coder<T> extends Serializable {
      * @return the serialized bytes buf.
      */
     default ByteBuf encodeBuf(T value) {
-        return Unpooled.wrappedBuffer(encode(value));
+        int len = getSerializedSize(value);
+        ByteBuf buffer = Unpooled.buffer(len, len);
+        buffer.setIndex(0, 0);
+        encode(value, buffer);
+        return buffer;
     }
 
     /**
@@ -46,7 +50,13 @@ public interface Coder<T> extends Serializable {
      * @param value value to encode
      * @return the serialized bytes bytes.
      */
-    byte[] encode(T value);
+    default byte[] encode(T value) {
+        byte[] data = new byte[getSerializedSize(value)];
+        ByteBuf buf = Unpooled.wrappedBuffer(data);
+        buf.setIndex(0, 0);
+        encode(value, buf);
+        return data;
+    }
 
     /**
      * Encodes the given value of type {@code T} into the <tt>destBuf</tt>.
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/coder/StringUtf8Coder.java b/stream/common/src/main/java/org/apache/bookkeeper/common/coder/StringUtf8Coder.java
index 919a90c..5d661d5 100644
--- a/stream/common/src/main/java/org/apache/bookkeeper/common/coder/StringUtf8Coder.java
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/coder/StringUtf8Coder.java
@@ -18,7 +18,10 @@
 
 package org.apache.bookkeeper.common.coder;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import com.google.common.base.Charsets;
+import com.google.common.base.Utf8;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import lombok.AccessLevel;
@@ -42,18 +45,13 @@ public final class StringUtf8Coder implements Coder<String> {
     private static final StringUtf8Coder INSTANCE = new StringUtf8Coder();
 
     @Override
-    public byte[] encode(String value) {
-        return value.getBytes(Charsets.UTF_8);
-    }
-
-    @Override
     public void encode(String value, ByteBuf destBuf) {
-        destBuf.writeCharSequence(value, Charsets.UTF_8);
+        destBuf.writeBytes(value.getBytes(UTF_8));
     }
 
     @Override
     public int getSerializedSize(String value) {
-        return value.length();
+        return Utf8.encodedLength(value);
     }
 
     @Override
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/coder/VarIntCoder.java b/stream/common/src/main/java/org/apache/bookkeeper/common/coder/VarIntCoder.java
new file mode 100644
index 0000000..9f93757
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/coder/VarIntCoder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.coder;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import java.io.IOException;
+import org.apache.bookkeeper.common.util.VarInt;
+
+/**
+ * A {@link Coder} that encodes {@link Integer Integers} using between 1 and 5 bytes. Negative
+ * numbers always take 5 bytes.
+ */
+public class VarIntCoder implements Coder<Integer> {
+
+    private static final long serialVersionUID = 465214482437322885L;
+
+    public static VarIntCoder of() {
+        return INSTANCE;
+    }
+
+    private static final VarIntCoder INSTANCE = new VarIntCoder();
+
+    private VarIntCoder() {
+    }
+
+    @Override
+    public void encode(Integer value, ByteBuf buf) {
+        checkNotNull(value, "Can not encode a null integer value");
+        checkNotNull(buf, "Can not encode into a null output buffer");
+
+        ByteBufOutputStream output = new ByteBufOutputStream(buf);
+
+        try {
+            VarInt.encode(value.intValue(), output);
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to encode integer '" + value + "' into the provided buffer", e);
+        }
+    }
+
+    @Override
+    public int getSerializedSize(Integer value) {
+        return VarInt.getLength(value);
+    }
+
+    @Override
+    public Integer decode(ByteBuf buf) {
+        checkNotNull(buf, "Can not decode into a null input buffer");
+
+        ByteBufInputStream input = new ByteBufInputStream(buf);
+
+        try {
+            return VarInt.decodeInt(input);
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to decode an integration from the provided buffer");
+        }
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/util/VarInt.java b/stream/common/src/main/java/org/apache/bookkeeper/common/util/VarInt.java
new file mode 100644
index 0000000..8544c24
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/util/VarInt.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Variable-length encoding for integers.
+ *
+ * <p>Handles, in a common encoding format, signed bytes, shorts, ints, and longs.
+ * Takes between 1 and 10 bytes.
+ * Less efficient than BigEndian{Int,Long} coder for negative or large numbers.
+ * All negative ints are encoded using 5 bytes, longs take 10 bytes.
+ */
+public class VarInt {
+
+    private static long convertIntToLongNoSignExtend(int v) {
+        return v & 0xFFFFFFFFL;
+    }
+
+    /**
+     * Encodes the given value onto the buffer.
+     *
+     * @param v   the value
+     * @param buf the buffer
+     * @throws IOException the io exception
+     */
+    public static void encode(int v, OutputStream buf) throws IOException {
+        encode(convertIntToLongNoSignExtend(v), buf);
+    }
+
+    /**
+     * Encodes the given value onto the buffer.
+     *
+     * @param v   the value
+     * @param buf the buffer
+     * @throws IOException the io exception
+     */
+    public static void encode(long v, OutputStream buf) throws IOException {
+        do {
+            // Encode next 7 bits + terminator bit
+            long bits = v & 0x7F;
+            v >>>= 7;
+            int b = (int) (bits | ((v != 0) ? 0x80 : 0));
+            buf.write(b);
+        } while (v != 0);
+    }
+
+    /**
+     * Decodes an integer value from the given buffer.
+     *
+     * @param buf the buffer
+     * @return the int value that decoded
+     * @throws IOException the io exception
+     */
+    public static int decodeInt(InputStream buf) throws IOException {
+        long r = decodeLong(buf);
+        if (r < 0 || r >= 1L << 32) {
+            throw new IOException("var int overflow " + r);
+        }
+        return (int) r;
+    }
+
+    /**
+     * Decodes a long value from the given buffer.
+     *
+     * @param buf the buf
+     * @return the long value that decoded
+     * @throws IOException the io exception
+     */
+    public static long decodeLong(InputStream buf) throws IOException {
+        long result = 0;
+        int shift = 0;
+        int b;
+        do {
+            // Get 7 bits from next byte
+            b = buf.read();
+            if (b < 0) {
+                if (shift == 0) {
+                    throw new EOFException();
+                } else {
+                    throw new IOException("varint not terminated");
+                }
+            }
+            long bits = b & 0x7F;
+            if (shift >= 64 || (shift == 63 && bits > 1)) {
+                // Out of range
+                throw new IOException("varint too long");
+            }
+            result |= bits << shift;
+            shift += 7;
+        } while ((b & 0x80) != 0);
+        return result;
+    }
+
+    /**
+     * Returns the length of the encoding of the given value (in bytes).
+     *
+     * @param v the value
+     * @return the length
+     */
+    public static int getLength(int v) {
+        return getLength(convertIntToLongNoSignExtend(v));
+    }
+
+    /**
+     * Returns the length of the encoding of the given value (in bytes).
+     *
+     * @param v the value
+     * @return the length
+     */
+    public static int getLength(long v) {
+        int result = 0;
+        do {
+            result++;
+            v >>>= 7;
+        } while (v != 0);
+        return result;
+    }
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/coder/CoderBasicTestCase.java b/stream/common/src/test/java/org/apache/bookkeeper/common/coder/CoderBasicTestCase.java
new file mode 100644
index 0000000..6235234
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/coder/CoderBasicTestCase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.coder;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+/**
+ * Basic Test Case for {@link Coder}s.
+ */
+public abstract class CoderBasicTestCase {
+
+    static <T> byte[] encode(Coder<T> coder, T value) {
+        return coder.encode(value);
+    }
+
+    static <T> T decode(Coder<T> coder, byte[] bytes) {
+        ByteBuf buf = Unpooled.wrappedBuffer(bytes);
+        buf.setIndex(0, bytes.length);
+        return coder.decode(buf);
+    }
+
+    private static <T> T decodeEncode(Coder<T> coder, T value) {
+        byte[] data = encode(coder, value);
+        assertEquals(coder.getSerializedSize(value), data.length);
+        return decode(coder, data);
+    }
+
+    public static <T> void coderDecodeEncodeEqual(Coder<T> coder, T value) {
+        assertThat(decodeEncode(coder, value), equalTo(value));
+    }
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/coder/TestByteArrayCoder.java b/stream/common/src/test/java/org/apache/bookkeeper/common/coder/TestByteArrayCoder.java
new file mode 100644
index 0000000..6ab8d0c
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/coder/TestByteArrayCoder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.coder;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link ByteArrayCoder}.
+ */
+public class TestByteArrayCoder extends CoderBasicTestCase {
+
+    private static final ByteArrayCoder TEST_CODER = ByteArrayCoder.of();
+
+    private static final List<byte[]> TEST_VALUES = Arrays.asList(
+        new byte[]{0xa, 0xb, 0xc},
+        new byte[]{0xd, 0x3},
+        new byte[]{0xd, 0xe},
+        new byte[]{});
+
+    @Test
+    public void testDecodeEncodeEquals() throws Exception {
+        for (byte[] value : TEST_VALUES) {
+            coderDecodeEncodeEqual(TEST_CODER, value);
+        }
+    }
+
+    @Test
+    public void testEncodeThenMutate() throws Exception {
+        byte[] input = {0x7, 0x3, 0xA, 0xf};
+        byte[] encoded = encode(TEST_CODER, input);
+        input[1] = 0x9;
+        byte[] decoded = decode(TEST_CODER, encoded);
+
+        // byte array coder that does encoding/decoding without copying
+        // the bytes, so mutating the input will mutate the output
+        assertThat(input, equalTo(decoded));
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/coder/TestStringUtf8Coder.java b/stream/common/src/test/java/org/apache/bookkeeper/common/coder/TestStringUtf8Coder.java
new file mode 100644
index 0000000..a554546
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/coder/TestStringUtf8Coder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.coder;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+/**
+ * Unit Tests for {@link StringUtf8Coder}.
+ */
+public class TestStringUtf8Coder extends CoderBasicTestCase {
+
+    private static final Coder<String> TEST_CODER = StringUtf8Coder.of();
+
+    private static final List<String> TEST_VALUES = Arrays.asList(
+        "", "a", "13", "hello",
+        "a longer string with spaces and all that",
+        "a string with a \n newline",
+        "スタリング");
+
+    @Test
+    public void testDecodeEncodeEquals() throws Exception {
+        for (String value : TEST_VALUES) {
+            coderDecodeEncodeEqual(TEST_CODER, value);
+        }
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/coder/TestVarIntCoder.java b/stream/common/src/test/java/org/apache/bookkeeper/common/coder/TestVarIntCoder.java
new file mode 100644
index 0000000..9ce7437
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/coder/TestVarIntCoder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.coder;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+/**
+ * Unit Tests for {@link VarIntCoder}.
+ */
+public class TestVarIntCoder extends CoderBasicTestCase {
+
+    private static final Coder<Integer> TEST_CODER = VarIntCoder.of();
+
+    private static final List<Integer> TEST_VALUES = Arrays.asList(
+        -11, -3, -1, 0, 1, 5, 13, 29,
+        Integer.MAX_VALUE,
+        Integer.MIN_VALUE);
+
+    @Test
+    public void testDecodeEncodeEquals() throws Exception {
+        for (Integer value : TEST_VALUES) {
+            coderDecodeEncodeEqual(TEST_CODER, value);
+        }
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/util/TestVarInt.java b/stream/common/src/test/java/org/apache/bookkeeper/common/util/TestVarInt.java
new file mode 100644
index 0000000..0e6d1b6
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/util/TestVarInt.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test Case for {@link VarInt}.
+ */
+public class TestVarInt {
+    @Rule
+    public final ExpectedException thrown = ExpectedException.none();
+
+    // Long values to check for boundary cases.
+    private static final long[] LONG_VALUES = {
+        0,
+        1,
+        127,
+        128,
+        16383,
+        16384,
+        2097151,
+        2097152,
+        268435455,
+        268435456,
+        34359738367L,
+        34359738368L,
+        9223372036854775807L,
+        -9223372036854775808L,
+        -1,
+    };
+
+    // VarInt encoding of the above VALUES.
+    private static final byte[][] LONG_ENCODED = {
+        // 0
+        {0x00},
+        // 1
+        {0x01},
+        // 127
+        {0x7f},
+        // 128
+        {(byte) 0x80, 0x01},
+        // 16383
+        {(byte) 0xff, 0x7f},
+        // 16834
+        {(byte) 0x80, (byte) 0x80, 0x01},
+        // 2097151
+        {(byte) 0xff, (byte) 0xff, 0x7f},
+        // 2097152
+        {(byte) 0x80, (byte) 0x80, (byte) 0x80, 0x01},
+        // 268435455
+        {(byte) 0xff, (byte) 0xff, (byte) 0xff, 0x7f},
+        // 268435456
+        {(byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, 0x01},
+        // 34359738367
+        {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, 0x7f},
+        // 34359738368
+        {(byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80,
+            0x01},
+        // 9223372036854775807
+        {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+            (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0x7f},
+        // -9223372036854775808L
+        {(byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80,
+            (byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, 0x01},
+        // -1
+        {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+            (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, 0x01}
+    };
+
+    // Integer values to check for boundary cases.
+    private static final int[] INT_VALUES = {
+        0,
+        1,
+        127,
+        128,
+        16383,
+        16384,
+        2097151,
+        2097152,
+        268435455,
+        268435456,
+        2147483647,
+        -2147483648,
+        -1,
+    };
+
+    // VarInt encoding of the above VALUES.
+    private static final byte[][] INT_ENCODED = {
+        // 0
+        {(byte) 0x00},
+        // 1
+        {(byte) 0x01},
+        // 127
+        {(byte) 0x7f},
+        // 128
+        {(byte) 0x80, (byte) 0x01},
+        // 16383
+        {(byte) 0xff, (byte) 0x7f},
+        // 16834
+        {(byte) 0x80, (byte) 0x80, (byte) 0x01},
+        // 2097151
+        {(byte) 0xff, (byte) 0xff, (byte) 0x7f},
+        // 2097152
+        {(byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x01},
+        // 268435455
+        {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0x7f},
+        // 268435456
+        {(byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x01},
+        // 2147483647
+        {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0x07},
+        // -2147483648
+        {(byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x08},
+        // -1
+        {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0x0f}
+    };
+
+    private static byte[] encodeInt(int v) throws IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        VarInt.encode(v, stream);
+        return stream.toByteArray();
+    }
+
+    private static byte[] encodeLong(long v) throws IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        VarInt.encode(v, stream);
+        return stream.toByteArray();
+    }
+
+    private static int decodeInt(byte[] encoded) throws IOException {
+        ByteArrayInputStream stream = new ByteArrayInputStream(encoded);
+        return VarInt.decodeInt(stream);
+    }
+
+    private static long decodeLong(byte[] encoded) throws IOException {
+        ByteArrayInputStream stream = new ByteArrayInputStream(encoded);
+        return VarInt.decodeLong(stream);
+    }
+
+    @Test
+    public void decodeValues() throws IOException {
+        assertEquals(LONG_VALUES.length, LONG_ENCODED.length);
+        for (int i = 0; i < LONG_ENCODED.length; ++i) {
+            ByteArrayInputStream stream = new ByteArrayInputStream(LONG_ENCODED[i]);
+            long parsed = VarInt.decodeLong(stream);
+            assertEquals(LONG_VALUES[i], parsed);
+            assertEquals(-1, stream.read());
+        }
+
+        assertEquals(INT_VALUES.length, INT_ENCODED.length);
+        for (int i = 0; i < INT_ENCODED.length; ++i) {
+            ByteArrayInputStream stream = new ByteArrayInputStream(INT_ENCODED[i]);
+            int parsed = VarInt.decodeInt(stream);
+            assertEquals(INT_VALUES[i], parsed);
+            assertEquals(-1, stream.read());
+        }
+    }
+
+    @Test
+    public void encodeValuesAndGetLength() throws IOException {
+        assertEquals(LONG_VALUES.length, LONG_ENCODED.length);
+        for (int i = 0; i < LONG_VALUES.length; ++i) {
+            byte[] encoded = encodeLong(LONG_VALUES[i]);
+            assertThat(encoded, equalTo(LONG_ENCODED[i]));
+            assertEquals(LONG_ENCODED[i].length, VarInt.getLength(LONG_VALUES[i]));
+        }
+
+        assertEquals(INT_VALUES.length, INT_ENCODED.length);
+        for (int i = 0; i < INT_VALUES.length; ++i) {
+            byte[] encoded = encodeInt(INT_VALUES[i]);
+            assertThat(encoded, equalTo(INT_ENCODED[i]));
+            assertEquals(INT_ENCODED[i].length, VarInt.getLength(INT_VALUES[i]));
+        }
+    }
+
+    @Test
+    public void decodeThrowsExceptionForOverflow() throws IOException {
+        final byte[] tooLargeNumber =
+            {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+                (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, 0x02};
+
+        thrown.expect(IOException.class);
+        decodeLong(tooLargeNumber);
+    }
+
+    @Test
+    public void decodeThrowsExceptionForIntOverflow() throws IOException {
+        byte[] encoded = encodeLong(1L << 32);
+
+        thrown.expect(IOException.class);
+        decodeInt(encoded);
+    }
+
+    @Test
+    public void decodeThrowsExceptionForIntUnderflow() throws IOException {
+        byte[] encoded = encodeLong(-1);
+
+        thrown.expect(IOException.class);
+        decodeInt(encoded);
+    }
+
+    @Test
+    public void decodeThrowsExceptionForNonterminated() throws IOException {
+        final byte[] nonTerminatedNumber =
+            {(byte) 0xff, (byte) 0xff};
+
+        thrown.expect(IOException.class);
+        decodeLong(nonTerminatedNumber);
+    }
+
+    @Test
+    public void decodeParsesEncodedValues() throws IOException {
+        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        for (int i = 10; i < Integer.MAX_VALUE; i = (int) (i * 1.1)) {
+            VarInt.encode(i, outStream);
+            VarInt.encode(-i, outStream);
+        }
+        for (long i = 10; i < Long.MAX_VALUE; i = (long) (i * 1.1)) {
+            VarInt.encode(i, outStream);
+            VarInt.encode(-i, outStream);
+        }
+
+        ByteArrayInputStream inStream =
+            new ByteArrayInputStream(outStream.toByteArray());
+        for (int i = 10; i < Integer.MAX_VALUE; i = (int) (i * 1.1)) {
+            assertEquals(i, VarInt.decodeInt(inStream));
+            assertEquals(-i, VarInt.decodeInt(inStream));
+        }
+        for (long i = 10; i < Long.MAX_VALUE; i = (long) (i * 1.1)) {
+            assertEquals(i, VarInt.decodeLong(inStream));
+            assertEquals(-i, VarInt.decodeLong(inStream));
+        }
+    }
+
+    @Test
+    public void endOfFileThrowsException() throws Exception {
+        ByteArrayInputStream inStream =
+            new ByteArrayInputStream(new byte[0]);
+        thrown.expect(EOFException.class);
+        VarInt.decodeInt(inStream);
+    }
+
+    @Test
+    public void unterminatedThrowsException() throws Exception {
+        byte[] e = encodeLong(Long.MAX_VALUE);
+        byte[] s = new byte[1];
+        s[0] = e[0];
+        ByteArrayInputStream inStream = new ByteArrayInputStream(s);
+        thrown.expect(IOException.class);
+        VarInt.decodeInt(inStream);
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.