You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/10/06 16:05:37 UTC

[1/2] beam git commit: OrderedCode library provides encoding a sequence of typed enities into a byte array that can be lexicographically sorted.

Repository: beam
Updated Branches:
  refs/heads/master 63b54a5b0 -> 480e60fa4


OrderedCode library provides encoding a sequence of typed enities into a byte array that can be lexicographically sorted.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/57bde534
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/57bde534
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/57bde534

Branch: refs/heads/master
Commit: 57bde5340ffc94735ec374c88ef5374323d6321b
Parents: 63b54a5
Author: Mairbek Khadikov <ma...@google.com>
Authored: Fri Sep 29 16:43:05 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Fri Oct 6 09:05:05 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/spanner/OrderedCode.java    | 764 ++++++++++++++++
 .../sdk/io/gcp/spanner/OrderedCodeTest.java     | 890 +++++++++++++++++++
 2 files changed, 1654 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/57bde534/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCode.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCode.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCode.java
new file mode 100644
index 0000000..80290d6
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCode.java
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.math.LongMath;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedInteger;
+
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+/**
+ * This module provides routines for encoding a sequence of typed
+ * entities into a byte array.  The resulting byte arrays can be
+ * lexicographically compared to yield the same comparison value that
+ * would have been generated if the encoded items had been compared
+ * one by one according to their type.
+ *
+ * <p>More precisely, suppose:
+ *  1. byte array A is generated by encoding the sequence of items [A_1..A_n]
+ *  2. byte array B is generated by encoding the sequence of items [B_1..B_n]
+ *  3. The types match; i.e., for all i: A_i was encoded using
+ *     the same routine as B_i
+ *
+ * <p>Then:
+ *    Comparing A vs. B lexicographically is the same as comparing
+ *    the vectors [A_1..A_n] and [B_1..B_n] lexicographically.
+ *
+ * <p><b>This class is NOT thread safe.</b>
+ */
+class OrderedCode {
+  // We want to encode a few extra symbols in strings:
+  //      <sep>           Separator between items
+  //      <infinity>      Infinite string
+  //
+  // Therefore we need an alphabet with at least 258 characters.  We
+  // achieve this by using two-letter sequences starting with '\0' and '\xff'
+  // as extra symbols:
+  //      <sep>           encoded as =>           \0\1
+  //      \0              encoded as =>           \0\xff
+  //      \xff            encoded as =>           \xff\x00
+  //      <infinity>      encoded as =>           \xff\xff
+  //
+  // The remaining two letter sequences starting with '\0' and '\xff'
+  // are currently unused.
+
+  static final byte ESCAPE1        = 0x00;
+  static final byte NULL_CHARACTER =
+      (byte) 0xff;                                  // Combined with ESCAPE1
+  static final byte SEPARATOR      = 0x01;   // Combined with ESCAPE1
+
+  static final byte ESCAPE2        = (byte) 0xff;
+  static final byte INFINITY       =
+      (byte) 0xff;                                  // Combined with ESCAPE2
+  static final byte FF_CHARACTER   = 0x00;   // Combined with ESCAPE2
+
+  static final byte[] ESCAPE1_SEPARATOR = { ESCAPE1, SEPARATOR };
+
+  static final byte[] INFINITY_ENCODED = { ESCAPE2, INFINITY };
+
+  static final byte[] INFINITY_ENCODED_DECREASING = {invert(ESCAPE2), invert(INFINITY)};
+
+  /**
+   * This array maps encoding length to header bits in the first two bytes for
+   * SignedNumIncreasing encoding.
+   */
+  private static final byte[][] LENGTH_TO_HEADER_BITS = {
+      { 0, 0 },
+      { (byte) 0x80, 0 },
+      { (byte) 0xc0, 0 },
+      { (byte) 0xe0, 0 },
+      { (byte) 0xf0, 0 },
+      { (byte) 0xf8, 0 },
+      { (byte) 0xfc, 0 },
+      { (byte) 0xfe, 0 },
+      { (byte) 0xff, 0 },
+      { (byte) 0xff, (byte) 0x80 },
+      { (byte) 0xff, (byte) 0xc0 }
+  };
+
+  /**
+   * This array maps encoding lengths to the header bits that overlap with
+   * the payload and need fixing during readSignedNumIncreasing.
+   */
+  private static final long[] LENGTH_TO_MASK = {
+      0L,
+      0x80L,
+      0xc000L,
+      0xe00000L,
+      0xf0000000L,
+      0xf800000000L,
+      0xfc0000000000L,
+      0xfe000000000000L,
+      0xff00000000000000L,
+      0x8000000000000000L,
+      0L
+  };
+
+  /**
+   * This array maps the number of bits in a number to the encoding
+   * length produced by WriteSignedNumIncreasing.
+   * For positive numbers, the number of bits is 1 plus the most significant
+   * bit position (the highest bit position in a positive long is 63).
+   * For a negative number n, we count the bits in ~n.
+   * That is, length = BITS_TO_LENGTH[log2Floor(n < 0 ? ~n : n) + 1].
+   */
+  private static final short[] BITS_TO_LENGTH = {
+      1, 1, 1, 1, 1, 1, 1,
+      2, 2, 2, 2, 2, 2, 2,
+      3, 3, 3, 3, 3, 3, 3,
+      4, 4, 4, 4, 4, 4, 4,
+      5, 5, 5, 5, 5, 5, 5,
+      6, 6, 6, 6, 6, 6, 6,
+      7, 7, 7, 7, 7, 7, 7,
+      8, 8, 8, 8, 8, 8, 8,
+      9, 9, 9, 9, 9, 9, 9,
+      10
+  };
+
+  // stores the current encoded value as a list of byte arrays. Note that this
+  // is manipulated as we read/write items.
+  // Note that every item will fit on at most one array. One array may
+  // have more than one item (eg when used for decoding). While encoding,
+  // one array will have exactly one item. While returning the encoded array
+  // we will merge all the arrays in this list.
+  private final ArrayList<byte[]> encodedArrays = new ArrayList<byte[]>();
+
+  // This is the current position on the first array. Will be non-zero
+  // only if the ordered code was created using encoded byte array.
+  private int firstArrayPosition = 0;
+
+  /**
+   * Creates OrderedCode from scratch. Typically used at encoding time.
+   */
+  public OrderedCode(){
+  }
+
+  /**
+   * Creates OrderedCode from a given encoded byte array. Typically used at
+   * decoding time.
+   *
+   * <p><b> For better performance, it uses the input array provided (not a copy).
+   * Therefore the input array should not be modified.</b>
+   */
+  public OrderedCode(byte[] encodedByteArray) {
+    encodedArrays.add(encodedByteArray);
+  }
+
+  /**
+   * Adds the given byte array item to the OrderedCode. It encodes the input
+   * byte array, followed by a separator and appends the result to its
+   * internal encoded byte array store.
+   *
+   * <p>It works with the input array,
+   * so the input array 'value' should not be modified till the method returns.
+   *
+   * @param value bytes to be written.
+   * @see #readBytes()
+   */
+  public void writeBytes(byte[] value) {
+    writeBytes(value, false);
+  }
+
+  public void writeBytesDecreasing(byte[] value) {
+    writeBytes(value, true);
+  }
+
+  private void writeBytes(byte[] value, boolean invert) {
+    // Determine the length of the encoded array
+    int encodedLength = 2;      // for separator
+    for (byte b : value) {
+      if ((b == ESCAPE1) || (b == ESCAPE2)) {
+        encodedLength += 2;
+      } else {
+        encodedLength++;
+      }
+    }
+
+    byte[] encodedArray = new byte[encodedLength];
+    int copyStart = 0;
+    int outIndex = 0;
+    for (int i = 0; i < value.length; i++) {
+      byte b = value[i];
+      if (b == ESCAPE1) {
+        arraycopy(invert, value, copyStart, encodedArray, outIndex,
+            i - copyStart);
+        outIndex += i - copyStart;
+        encodedArray[outIndex++] = convert(invert, ESCAPE1);
+        encodedArray[outIndex++] = convert(invert, NULL_CHARACTER);
+        copyStart = i + 1;
+      } else if (b == ESCAPE2) {
+        arraycopy(invert, value, copyStart, encodedArray, outIndex,
+            i - copyStart);
+        outIndex += i - copyStart;
+        encodedArray[outIndex++] = convert(invert, ESCAPE2);
+        encodedArray[outIndex++] = convert(invert, FF_CHARACTER);
+        copyStart = i + 1;
+      }
+    }
+    if (copyStart < value.length) {
+      arraycopy(invert, value, copyStart, encodedArray, outIndex,
+          value.length - copyStart);
+      outIndex += value.length - copyStart;
+    }
+    encodedArray[outIndex++] = convert(invert, ESCAPE1);
+    encodedArray[outIndex] = convert(invert, SEPARATOR);
+
+    encodedArrays.add(encodedArray);
+  }
+
+  private static byte convert(boolean invert, byte val) {
+    return invert ? (byte) ~val : val;
+  }
+
+  private static byte invert(byte val) {
+    return convert(true, val);
+  }
+
+  private void arraycopy(
+      boolean invert, byte[] src, int srcPos, byte[] dest, int destPos, int length) {
+    System.arraycopy(src, srcPos, dest, destPos, length);
+    if (invert) {
+      for (int i = destPos; i < destPos + length; i++) {
+        dest[i] = (byte) ~dest[i];
+      }
+    }
+  }
+
+  /**
+   * Encodes the long item, in big-endian format, and appends the result to its
+   * internal encoded byte array store.
+   *
+   * @see #readNumIncreasing()
+   */
+  public void writeNumIncreasing(long value) {
+    // Values are encoded with a single byte length prefix, followed
+    // by the actual value in big-endian format with leading 0 bytes
+    // dropped.
+    byte[] bufer = new byte[9];  // 8 bytes for value plus one byte for length
+    int len = 0;
+    while (value != 0) {
+      len++;
+      bufer[9 - len] = (byte) (value & 0xff);
+      value >>>= 8;
+    }
+    bufer[9 - len - 1] = (byte) len;
+    len++;
+    byte[] encodedArray = new byte[len];
+    System.arraycopy(bufer, 9 - len, encodedArray, 0, len);
+    encodedArrays.add(encodedArray);
+  }
+
+  public void writeNumIncreasing(UnsignedInteger unsignedInt) {
+    writeNumIncreasing(unsignedInt.longValue());
+  }
+
+  /**
+   * Encodes the long item, in big-endian format, and appends the result to its
+   * internal encoded byte array store.
+   *
+   * @see #readNumIncreasing()
+   */
+  public void writeNumDecreasing(long value) {
+    // Values are encoded with a complemented single byte length prefix,
+    // followed by the complement of the actual value in big-endian format with
+    // leading 0xff bytes dropped.
+    byte[] bufer = new byte[9];  // 8 bytes for value plus one byte for length
+    int len = 0;
+    while (value != 0) {
+      len++;
+      bufer[9 - len] = (byte) ~(value & 0xff);
+      value >>>= 8;
+    }
+    bufer[9 - len - 1] = (byte) ~len;
+    len++;
+    byte[] encodedArray = new byte[len];
+    System.arraycopy(bufer, 9 - len, encodedArray, 0, len);
+    encodedArrays.add(encodedArray);
+  }
+
+  public void writeNumDecreasing(UnsignedInteger unsignedInt) {
+    writeNumDecreasing(unsignedInt.longValue());
+  }
+
+  /**
+   * Return floor(log2(n)) for positive integer n.  Returns -1 iff n == 0.
+   *
+   */
+  @VisibleForTesting
+  int log2Floor(long n) {
+    checkArgument(n >= 0);
+    return n == 0 ? -1 : LongMath.log2(n, RoundingMode.FLOOR);
+  }
+
+  /**
+   * Calculates the encoding length in bytes of the signed number n.
+   */
+  @VisibleForTesting
+  int getSignedEncodingLength(long n) {
+    return BITS_TO_LENGTH[log2Floor(n < 0 ? ~n : n) + 1];
+  }
+
+  /**
+   * @see #readSignedNumIncreasing()
+   */
+  public void writeSignedNumIncreasing(long val) {
+    long x = val < 0 ? ~val : val;
+    if (x < 64) {  // Fast path for encoding length == 1.
+      byte[] encodedArray = new byte[] { (byte) (LENGTH_TO_HEADER_BITS[1][0] ^ val) };
+      encodedArrays.add(encodedArray);
+      return;
+    }
+    // buf = val in network byte order, sign extended to 10 bytes.
+    byte signByte = val < 0 ? (byte) 0xff : 0;
+    byte[] buf = new byte[2 + Longs.BYTES];
+    buf[0] = buf[1] = signByte;
+    System.arraycopy(Longs.toByteArray(val), 0, buf, 2, Longs.BYTES);
+    int len = getSignedEncodingLength(x);
+    if (len < 2) {
+      throw new IllegalStateException(
+          "Invalid length (" + len + ")" + " returned by getSignedEncodingLength(" + x + ")");
+    }
+    int beginIndex = buf.length - len;
+    buf[beginIndex] ^= LENGTH_TO_HEADER_BITS[len][0];
+    buf[beginIndex + 1] ^= LENGTH_TO_HEADER_BITS[len][1];
+
+    byte[] encodedArray = new byte[len];
+    System.arraycopy(buf, beginIndex, encodedArray, 0, len);
+    encodedArrays.add(encodedArray);
+  }
+
+  public void writeSignedNumDecreasing(long val) {
+    writeSignedNumIncreasing(~val);
+  }
+
+  /**
+   * Encodes and appends INFINITY item to its internal encoded byte array
+   * store.
+   *
+   * @see #readInfinity()
+   */
+  public void writeInfinity() {
+    writeTrailingBytes(INFINITY_ENCODED);
+  }
+
+  /**
+   * Encodes and appends INFINITY item which would come before any real string.
+   *
+   * @see #readInfinityDecreasing()
+   */
+  public void writeInfinityDecreasing() {
+    writeTrailingBytes(INFINITY_ENCODED_DECREASING);
+  }
+
+  /**
+   * Appends the byte array item to its internal encoded byte array
+   * store. This is used for the last item and is not encoded.
+   *
+   * <p>It stores the input array in the store,
+   * so the input array 'value' should not be modified.
+   *
+   * @param value bytes to be written.
+   * @see #readTrailingBytes()
+   */
+  public void writeTrailingBytes(byte[] value) {
+    if ((value == null) || (value.length == 0)) {
+      throw new IllegalArgumentException(
+          "Value cannot be null or have 0 elements");
+    }
+
+    encodedArrays.add(value);
+  }
+
+  /**
+   * Returns the next byte array item from its encoded byte array store and
+   * removes the item from the store.
+   *
+   * @see #writeBytes(byte[])
+   */
+  public byte[] readBytes() {
+    return readBytes(false);
+  }
+
+  public byte[] readBytesDecreasing() {
+    return readBytes(true);
+  }
+
+  private byte[] readBytes(boolean invert) {
+    if ((encodedArrays == null) || (encodedArrays.size() == 0) || (
+        (encodedArrays.get(0)).length - firstArrayPosition <= 0)) {
+      throw new IllegalArgumentException("Invalid encoded byte array");
+    }
+
+    // Determine the length of the decoded array
+    // We only scan up to "length-2" since a valid string must end with
+    // a two character terminator: 'ESCAPE1 SEPARATOR'
+    byte[] store = encodedArrays.get(0);
+    int decodedLength = 0;
+    boolean valid = false;
+    int i = firstArrayPosition;
+    while (i < store.length - 1) {
+      byte b = store[i++];
+      if (b == convert(invert, ESCAPE1)) {
+        b = store[i++];
+        if (b == convert(invert, SEPARATOR)) {
+          valid = true;
+          break;
+        } else if (b == convert(invert, NULL_CHARACTER)) {
+          decodedLength++;
+        } else {
+          throw new IllegalArgumentException("Invalid encoded byte array");
+        }
+      } else if (b == convert(invert, ESCAPE2)) {
+        b = store[i++];
+        if (b == convert(invert, FF_CHARACTER)) {
+          decodedLength++;
+        } else {
+          throw new IllegalArgumentException("Invalid encoded byte array");
+        }
+      } else {
+        decodedLength++;
+      }
+    }
+    if (!valid) {
+      throw new IllegalArgumentException("Invalid encoded byte array");
+    }
+
+    byte[] decodedArray = new byte[decodedLength];
+    int copyStart = firstArrayPosition;
+    int outIndex = 0;
+    int j = firstArrayPosition;
+    while (j < store.length - 1) {
+      byte b = store[j++];   // note that j has been incremented
+      if (b == convert(invert, ESCAPE1)) {
+        arraycopy(invert, store, copyStart, decodedArray, outIndex, j - copyStart - 1);
+        outIndex += j - copyStart - 1;
+        // ESCAPE1 SEPARATOR ends component
+        // ESCAPE1 NULL_CHARACTER represents '\0'
+        b = store[j++];
+        if (b == convert(invert, SEPARATOR)) {
+          if ((store.length - j) == 0) {
+            // we are done with the first array
+            encodedArrays.remove(0);
+            firstArrayPosition = 0;
+          } else {
+            firstArrayPosition = j;
+          }
+          return decodedArray;
+        } else if (b == convert(invert, NULL_CHARACTER)) {
+          decodedArray[outIndex++] = 0x00;
+        }   // else not required - handled during length determination
+        copyStart = j;
+      } else if (b == convert(invert, ESCAPE2)) {
+        arraycopy(invert, store, copyStart, decodedArray, outIndex, j - copyStart - 1);
+        outIndex += j - copyStart - 1;
+        // ESCAPE2 FF_CHARACTER represents '\xff'
+        // ESCAPE2 INFINITY is an error
+        b = store[j++];
+        if (b == convert(invert, FF_CHARACTER)) {
+          decodedArray[outIndex++] = (byte) 0xff;
+        }   // else not required - handled during length determination
+        copyStart = j;
+      }
+    }
+    // not required due to the first phase, but need to entertain the compiler
+    throw new IllegalArgumentException("Invalid encoded byte array");
+  }
+
+  /**
+   * Returns the next long item (encoded in big-endian format via
+   * {@code writeNumIncreasing(long)}) from its internal encoded byte array
+   * store and removes the item from the store.
+   *
+   * @see #writeNumIncreasing(long)
+   */
+  public long readNumIncreasing() {
+    if ((encodedArrays == null) || (encodedArrays.size() == 0) || (
+        (encodedArrays.get(0)).length - firstArrayPosition < 1)) {
+      throw new IllegalArgumentException("Invalid encoded byte array");
+    }
+
+    byte[] store = encodedArrays.get(0);
+    // Decode length byte
+    int len = store[firstArrayPosition];
+    if ((firstArrayPosition + len + 1 > store.length) || len > 8) {
+      throw new IllegalArgumentException("Invalid encoded byte array");
+    }
+
+    long result = 0;
+    for (int i = 0; i < len; i++) {
+      result <<= 8;
+      result |= (store[firstArrayPosition + i + 1] & 0xff);
+    }
+
+    if ((store.length - firstArrayPosition - len - 1) == 0) {
+      // we are done with the first array
+      encodedArrays.remove(0);
+      firstArrayPosition = 0;
+    } else {
+      firstArrayPosition = firstArrayPosition + len + 1;
+    }
+
+    return result;
+  }
+
+  /**
+   * Returns the next long item (encoded in big-endian format via
+   * {@code writeNumDecreasing(long)}) from its internal encoded byte array
+   * store and removes the item from the store.
+   *
+   * @see #writeNumDecreasing(long)
+   */
+  public long readNumDecreasing() {
+    if ((encodedArrays == null) || (encodedArrays.size() == 0)
+        || ((encodedArrays.get(0)).length - firstArrayPosition < 1)) {
+      throw new IllegalArgumentException("Invalid encoded byte array");
+    }
+
+    byte[] store = encodedArrays.get(0);
+    // Decode length byte
+    int len = ~store[firstArrayPosition] & 0xff;
+    if ((firstArrayPosition + len + 1 > store.length) || len > 8) {
+      throw new IllegalArgumentException("Invalid encoded byte array");
+    }
+
+    long result = 0;
+    for (int i = 0; i < len; i++) {
+      result <<= 8;
+      result |= (~store[firstArrayPosition + i + 1] & 0xff);
+    }
+
+    if ((store.length - firstArrayPosition - len - 1) == 0) {
+      // we are done with the first array
+      encodedArrays.remove(0);
+      firstArrayPosition = 0;
+    } else {
+      firstArrayPosition = firstArrayPosition + len + 1;
+    }
+
+    return result;
+  }
+
+  /**
+   * Returns the next long item (encoded via
+   * {@code writeSignedNumIncreasing(long)}) from its internal encoded byte
+   * array store and removes the item from the store.
+   *
+   * @see #writeSignedNumIncreasing(long)
+   */
+  public long readSignedNumIncreasing() {
+    if ((encodedArrays == null) || (encodedArrays.size() == 0) || (
+        (encodedArrays.get(0)).length - firstArrayPosition < 1)) {
+      throw new IllegalArgumentException("Invalid encoded byte array");
+    }
+
+    byte[] store = encodedArrays.get(0);
+
+    long xorMask = ((store[firstArrayPosition] & 0x80) == 0) ? ~0L : 0L;
+    // Store first byte as an int rather than a (signed) byte -- to avoid
+    // accidental byte-to-int promotion later which would extend the byte's
+    // sign bit (if any).
+    int firstByte = (store[firstArrayPosition] & 0xff) ^ (int) (xorMask & 0xff);
+
+    // Now calculate and test length, and set x to raw (unmasked) result.
+    int len;
+    long x;
+    if (firstByte != 0xff) {
+      len = 7 - log2Floor(firstByte ^ 0xff);
+      if (store.length - firstArrayPosition < len) {
+        throw new IllegalArgumentException("Invalid encoded byte array");
+      }
+      x = xorMask;  // Sign extend using xorMask.
+      for (int i = firstArrayPosition; i < firstArrayPosition + len; i++) {
+        x = (x << 8) | (store[i] & 0xff);
+      }
+    } else {
+      len = 8;
+      if (store.length - firstArrayPosition < len) {
+        throw new IllegalArgumentException("Invalid encoded byte array");
+      }
+      int secondByte = (store[firstArrayPosition + 1] & 0xff) ^ (int) (xorMask & 0xff);
+      if (secondByte >= 0x80) {
+        if (secondByte < 0xc0) {
+          len = 9;
+        } else {
+          int thirdByte = (store[firstArrayPosition + 2] & 0xff) ^ (int) (xorMask & 0xff);
+          if (secondByte == 0xc0 && thirdByte < 0x80) {
+            len = 10;
+          } else {
+            // Either len > 10 or len == 10 and #bits > 63.
+            throw new IllegalArgumentException("Invalid encoded byte array");
+          }
+        }
+        if (store.length - firstArrayPosition < len) {
+          throw new IllegalArgumentException("Invalid encoded byte array");
+        }
+      }
+      x = Longs.fromByteArray(
+          Arrays.copyOfRange(store, firstArrayPosition + len - 8, firstArrayPosition + len));
+    }
+
+    x ^= LENGTH_TO_MASK[len];  // Remove spurious header bits.
+
+    if (len != getSignedEncodingLength(x)) {
+      throw new IllegalArgumentException("Invalid encoded byte array");
+    }
+
+    if ((store.length - firstArrayPosition - len) == 0) {
+      // We are done with the first array.
+      encodedArrays.remove(0);
+      firstArrayPosition = 0;
+    } else {
+      firstArrayPosition = firstArrayPosition + len;
+    }
+
+    return x;
+  }
+
+  public long readSignedNumDecreasing() {
+    return ~readSignedNumIncreasing();
+  }
+
+  /**
+   * Removes INFINITY item from its internal encoded byte array store
+   * if present.  Returns whether INFINITY was present.
+   *
+   * @see #writeInfinity()
+   */
+  public boolean readInfinity() {
+    return readInfinityInternal(INFINITY_ENCODED);
+  }
+
+  /**
+   * Removes INFINITY item from its internal encoded byte array store if present. Returns whether
+   * INFINITY was present.
+   *
+   * @see #writeInfinityDecreasing()
+   */
+  public boolean readInfinityDecreasing() {
+    return readInfinityInternal(INFINITY_ENCODED_DECREASING);
+  }
+
+  private boolean readInfinityInternal(byte[] codes) {
+    if ((encodedArrays == null) || (encodedArrays.size() == 0)
+        || ((encodedArrays.get(0)).length - firstArrayPosition < 1)) {
+      throw new IllegalArgumentException("Invalid encoded byte array");
+    }
+    byte[] store = encodedArrays.get(0);
+    if (store.length - firstArrayPosition < 2) {
+      return false;
+    }
+    if ((store[firstArrayPosition] == codes[0]) && (store[firstArrayPosition + 1] == codes[1])) {
+      if ((store.length - firstArrayPosition - 2) == 0) {
+        // we are done with the first array
+        encodedArrays.remove(0);
+        firstArrayPosition = 0;
+      } else {
+        firstArrayPosition = firstArrayPosition + 2;
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Returns the trailing byte array item from its internal encoded byte array
+   * store and removes the item from the store.
+   *
+   * @see #writeTrailingBytes(byte[])
+   */
+  public byte[] readTrailingBytes() {
+    // one item is contained within one byte array
+    if ((encodedArrays == null) || (encodedArrays.size() != 1)) {
+      throw new IllegalArgumentException("Invalid encoded byte array");
+    }
+
+    byte[] store = encodedArrays.get(0);
+    encodedArrays.remove(0);
+    assert encodedArrays.size() == 0;
+    return Arrays.copyOfRange(store, firstArrayPosition, store.length);
+  }
+
+  /**
+   * Returns the encoded bytes that represents the current state of the
+   * OrderedCode.
+   *
+   * <p><b> NOTE: This method returns OrederedCode's internal array (not a
+   * copy) for better performance. Therefore the returned array should not be
+   * modified.</b>
+   */
+  public byte[] getEncodedBytes() {
+    if (encodedArrays.size() == 0) {
+      return new byte[0];
+    }
+    if ((encodedArrays.size() == 1) && (firstArrayPosition == 0)) {
+      return encodedArrays.get(0);
+    }
+
+    int totalLength = 0;
+
+    for (int i = 0; i < encodedArrays.size(); i++) {
+      byte[] bytes = encodedArrays.get(i);
+      if (i == 0) {
+        totalLength += bytes.length - firstArrayPosition;
+      } else {
+        totalLength += bytes.length;
+      }
+    }
+
+    byte[] encodedBytes = new byte[totalLength];
+    int destPos = 0;
+    for (int i = 0; i < encodedArrays.size(); i++) {
+      byte[] bytes = encodedArrays.get(i);
+      if (i == 0) {
+        System.arraycopy(bytes, firstArrayPosition, encodedBytes, destPos,
+            bytes.length - firstArrayPosition);
+        destPos += bytes.length - firstArrayPosition;
+      } else {
+        System.arraycopy(bytes, 0, encodedBytes, destPos, bytes.length);
+        destPos += bytes.length;
+      }
+    }
+
+    // replace the store with merged array, so that repeated calls
+    // don't need to merge. The reads can handle both the versions.
+    encodedArrays.clear();
+    encodedArrays.add(encodedBytes);
+    firstArrayPosition = 0;
+
+    return encodedBytes;
+  }
+
+  /**
+   * Returns true if it has more encoded bytes that haven't been read,
+   * false otherwise.  Return value of true doesn't imply anything about
+   * validity of remaining data.
+   * @return true if it has more encoded bytes that haven't been read,
+   * false otherwise.
+   */
+  public boolean hasRemainingEncodedBytes() {
+    // We delete an array after fully consuming it.
+    return encodedArrays != null && encodedArrays.size() != 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/57bde534/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCodeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCodeTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCodeTest.java
new file mode 100644
index 0000000..5be4826
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCodeTest.java
@@ -0,0 +1,890 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.BaseEncoding;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.UnsignedBytes;
+import com.google.common.primitives.UnsignedInteger;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * A set of unit tests to verify {@link OrderedCode}.
+ */
+@RunWith(JUnit4.class)
+public class OrderedCodeTest {
+  /** Data for a generic coding test case with known encoded outputs. */
+  abstract static class CodingTestCase<T> {
+    /** The test value. */
+    abstract T value();
+
+    /**
+     * Test value's encoding in increasing order (obtained from the C++
+     * implementation).
+     */
+    abstract String increasingBytes();
+
+    /**
+     * Test value's encoding in dencreasing order (obtained from the C++
+     * implementation).
+     */
+    abstract String decreasingBytes();
+
+    // Helper methods to implement in concrete classes.
+
+    abstract byte[] encodeIncreasing();
+    abstract byte[] encodeDecreasing();
+
+    T decodeIncreasing() {
+      return decodeIncreasing(
+          new OrderedCode(bytesFromHexString(increasingBytes())));
+    }
+
+    T decodeDecreasing() {
+      return decodeDecreasing(
+          new OrderedCode(bytesFromHexString(decreasingBytes())));
+    }
+
+    abstract T decodeIncreasing(OrderedCode orderedCode);
+    abstract T decodeDecreasing(OrderedCode orderedCode);
+  }
+
+  @AutoValue
+  abstract static class UnsignedNumber extends CodingTestCase<Long> {
+    @Override
+    byte[] encodeIncreasing() {
+      OrderedCode orderedCode = new OrderedCode();
+      orderedCode.writeNumIncreasing(value());
+      return orderedCode.getEncodedBytes();
+    }
+
+    @Override
+    byte[] encodeDecreasing() {
+      OrderedCode orderedCode = new OrderedCode();
+      orderedCode.writeNumDecreasing(value());
+      return orderedCode.getEncodedBytes();
+    }
+
+    @Override
+    Long decodeIncreasing(OrderedCode orderedCode) {
+      return orderedCode.readNumIncreasing();
+    }
+
+    @Override
+    Long decodeDecreasing(OrderedCode orderedCode) {
+      return orderedCode.readNumDecreasing();
+    }
+
+    private static UnsignedNumber testCase(
+        long value, String increasingBytes, String decreasingBytes) {
+      return new AutoValue_OrderedCodeTest_UnsignedNumber(
+          value, increasingBytes, decreasingBytes);
+    }
+
+    /** Test cases for unsigned numbers, in increasing (unsigned) order by value. */
+    private static final ImmutableList<UnsignedNumber> TEST_CASES =
+        ImmutableList.of(
+            testCase(0, "00", "ff"),
+            testCase(1, "0101", "fefe"),
+            testCase(33, "0121", "fede"),
+            testCase(55000, "02d6d8", "fd2927"),
+            testCase(Integer.MAX_VALUE, "047fffffff", "fb80000000"),
+            testCase(Long.MAX_VALUE, "087fffffffffffffff", "f78000000000000000"),
+            testCase(Long.MIN_VALUE, "088000000000000000", "f77fffffffffffffff"),
+            testCase(-100, "08ffffffffffffff9c", "f70000000000000063"),
+            testCase(-1, "08ffffffffffffffff", "f70000000000000000"));
+  }
+
+  @AutoValue
+  abstract static class BytesTest extends CodingTestCase<String> {
+    @Override
+    byte[] encodeIncreasing() {
+      OrderedCode orderedCode = new OrderedCode();
+      orderedCode.writeBytes(bytesFromHexString(value()));
+      return orderedCode.getEncodedBytes();
+    }
+
+    @Override
+    byte[] encodeDecreasing() {
+      OrderedCode orderedCode = new OrderedCode();
+      orderedCode.writeBytesDecreasing(bytesFromHexString(value()));
+      return orderedCode.getEncodedBytes();
+    }
+
+    @Override
+    String decodeIncreasing(OrderedCode orderedCode) {
+      return bytesToHexString(orderedCode.readBytes());
+    }
+
+    @Override
+    String decodeDecreasing(OrderedCode orderedCode) {
+      return bytesToHexString(orderedCode.readBytesDecreasing());
+    }
+
+    private static BytesTest testCase(
+        String value, String increasingBytes, String decreasingBytes) {
+      return new AutoValue_OrderedCodeTest_BytesTest(
+          value, increasingBytes, decreasingBytes);
+    }
+
+    /** Test cases for byte arrays, in increasing order by value. */
+    private static final ImmutableList<BytesTest> TEST_CASES =
+        ImmutableList.of(
+            testCase("", "0001", "fffe"),
+            testCase("00", "00ff0001", "ff00fffe"),
+            testCase("0000", "00ff00ff0001", "ff00ff00fffe"),
+            testCase("0001", "00ff010001", "ff00fefffe"),
+            testCase("0041", "00ff410001", "ff00befffe"),
+            testCase("00ff", "00ffff000001", "ff0000fffffe"),
+            testCase("01", "010001", "fefffe"),
+            testCase("0100", "0100ff0001", "feff00fffe"),
+            testCase("6f776c", "6f776c0001", "908893fffe"),
+            testCase("ff", "ff000001", "00fffffe"),
+            testCase("ff00", "ff0000ff0001", "00ffff00fffe"),
+            testCase("ff01", "ff00010001", "00fffefffe"),
+            testCase("ffff", "ff00ff000001", "00ff00fffffe"),
+            testCase("ffffff", "ff00ff00ff000001", "00ff00ff00fffffe"));
+  }
+
+  @Test
+  public void testUnsignedEncoding() {
+    testEncoding(UnsignedNumber.TEST_CASES);
+  }
+
+  @Test
+  public void testUnsignedDecoding() {
+    testDecoding(UnsignedNumber.TEST_CASES);
+  }
+
+  @Test
+  public void testUnsignedOrdering() {
+    testOrdering(UnsignedNumber.TEST_CASES);
+  }
+
+  @Test
+  public void testBytesEncoding() {
+    testEncoding(BytesTest.TEST_CASES);
+  }
+
+  @Test
+  public void testBytesDecoding() {
+    testDecoding(BytesTest.TEST_CASES);
+  }
+
+  @Test
+  public void testBytesOrdering() {
+    testOrdering(BytesTest.TEST_CASES);
+  }
+
+  private void testEncoding(List<? extends CodingTestCase<?>> testCases) {
+    for (CodingTestCase<?> testCase : testCases) {
+      byte[] actualIncreasing = testCase.encodeIncreasing();
+      byte[] expectedIncreasing =
+          bytesFromHexString(testCase.increasingBytes());
+      assertEquals(0, compare(actualIncreasing, expectedIncreasing));
+
+      byte[] actualDecreasing = testCase.encodeDecreasing();
+      byte[] expectedDecreasing =
+          bytesFromHexString(testCase.decreasingBytes());
+      assertEquals(0, compare(actualDecreasing, expectedDecreasing));
+    }
+  }
+
+  private void testDecoding(List<? extends CodingTestCase<?>> testCases) {
+    for (CodingTestCase<?> testCase : testCases) {
+      assertEquals(testCase.value(), testCase.decodeIncreasing());
+      assertEquals(testCase.value(), testCase.decodeDecreasing());
+    }
+  }
+
+  private void testOrdering(List<? extends CodingTestCase<?>> testCases) {
+    // This is verifiable by inspection of the C++ encodings, but it seems
+    // worth checking explicitly
+    for (int caseIndex = 0; caseIndex < testCases.size() - 1; caseIndex++) {
+      byte[] encodedValue = testCases.get(caseIndex).encodeIncreasing();
+      byte[] nextEncodedValue = testCases.get(caseIndex + 1).encodeIncreasing();
+      assertTrue(compare(encodedValue, nextEncodedValue) < 0);
+
+      encodedValue = testCases.get(caseIndex).encodeDecreasing();
+      nextEncodedValue = testCases.get(caseIndex + 1).encodeDecreasing();
+      assertTrue(compare(encodedValue, nextEncodedValue) > 0);
+    }
+  }
+
+  @Test
+  public void testWriteInfinity() {
+    OrderedCode orderedCode = new OrderedCode();
+    try {
+      orderedCode.readInfinity();
+      fail("Expected IllegalArgumentException.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    orderedCode.writeInfinity();
+    assertTrue(orderedCode.readInfinity());
+    try {
+      orderedCode.readInfinity();
+      fail("Expected IllegalArgumentException.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testWriteInfinityDecreasing() {
+    OrderedCode orderedCode = new OrderedCode();
+    try {
+      orderedCode.readInfinityDecreasing();
+      fail("Expected IllegalArgumentException.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    orderedCode.writeInfinityDecreasing();
+    assertTrue(orderedCode.readInfinityDecreasing());
+    try {
+      orderedCode.readInfinityDecreasing();
+      fail("Expected IllegalArgumentException.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testWriteBytes() {
+    byte[] first = { 'a', 'b', 'c'};
+    byte[] second = { 'd', 'e', 'f'};
+    byte[] last = { 'x', 'y', 'z'};
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeBytes(first);
+    byte[] firstEncoded = orderedCode.getEncodedBytes();
+    assertTrue(Arrays.equals(orderedCode.readBytes(), first));
+
+    orderedCode.writeBytes(first);
+    orderedCode.writeBytes(second);
+    orderedCode.writeBytes(last);
+    byte[] allEncoded = orderedCode.getEncodedBytes();
+    assertTrue(Arrays.equals(orderedCode.readBytes(), first));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), second));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), last));
+
+    orderedCode = new OrderedCode(firstEncoded);
+    orderedCode.writeBytes(second);
+    orderedCode.writeBytes(last);
+    assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), allEncoded));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), first));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), second));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), last));
+
+    orderedCode = new OrderedCode(allEncoded);
+    assertTrue(Arrays.equals(orderedCode.readBytes(), first));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), second));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), last));
+  }
+
+  @Test
+  public void testWriteBytesDecreasing() {
+    byte[] first = { 'a', 'b', 'c'};
+    byte[] second = { 'd', 'e', 'f'};
+    byte[] last = { 'x', 'y', 'z'};
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeBytesDecreasing(first);
+    byte[] firstEncoded = orderedCode.getEncodedBytes();
+    assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), first));
+
+    orderedCode.writeBytesDecreasing(first);
+    orderedCode.writeBytesDecreasing(second);
+    orderedCode.writeBytesDecreasing(last);
+    byte[] allEncoded = orderedCode.getEncodedBytes();
+    assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), first));
+    assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), second));
+    assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), last));
+
+    orderedCode = new OrderedCode(firstEncoded);
+    orderedCode.writeBytesDecreasing(second);
+    orderedCode.writeBytesDecreasing(last);
+    assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), allEncoded));
+    assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), first));
+    assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), second));
+    assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), last));
+
+    orderedCode = new OrderedCode(allEncoded);
+    assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), first));
+    assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), second));
+    assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), last));
+  }
+
+  @Test
+  public void testWriteNumIncreasing() {
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeNumIncreasing(0);
+    orderedCode.writeNumIncreasing(1);
+    orderedCode.writeNumIncreasing(Long.MIN_VALUE);
+    orderedCode.writeNumIncreasing(Long.MAX_VALUE);
+    assertEquals(0, orderedCode.readNumIncreasing());
+    assertEquals(1, orderedCode.readNumIncreasing());
+    assertEquals(Long.MIN_VALUE, orderedCode.readNumIncreasing());
+    assertEquals(Long.MAX_VALUE, orderedCode.readNumIncreasing());
+  }
+
+  @Test
+  public void testWriteNumIncreasing_unsignedInt() {
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeNumIncreasing(UnsignedInteger.fromIntBits(0));
+    orderedCode.writeNumIncreasing(UnsignedInteger.fromIntBits(1));
+    orderedCode.writeNumIncreasing(UnsignedInteger.fromIntBits(Integer.MIN_VALUE));
+    orderedCode.writeNumIncreasing(UnsignedInteger.fromIntBits(Integer.MAX_VALUE));
+    assertEquals(0, orderedCode.readNumIncreasing());
+    assertEquals(1, orderedCode.readNumIncreasing());
+    assertEquals(Long.valueOf(Integer.MAX_VALUE) + 1L, orderedCode.readNumIncreasing());
+    assertEquals(Integer.MAX_VALUE, orderedCode.readNumIncreasing());
+  }
+
+  @Test
+  public void testWriteNumDecreasing() {
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeNumDecreasing(0);
+    orderedCode.writeNumDecreasing(1);
+    orderedCode.writeNumDecreasing(Long.MIN_VALUE);
+    orderedCode.writeNumDecreasing(Long.MAX_VALUE);
+    assertEquals(0, orderedCode.readNumDecreasing());
+    assertEquals(1, orderedCode.readNumDecreasing());
+    assertEquals(Long.MIN_VALUE, orderedCode.readNumDecreasing());
+    assertEquals(Long.MAX_VALUE, orderedCode.readNumDecreasing());
+  }
+
+  @Test
+  public void testWriteNumDecreasing_unsignedInt() {
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeNumDecreasing(UnsignedInteger.fromIntBits(0));
+    orderedCode.writeNumDecreasing(UnsignedInteger.fromIntBits(1));
+    orderedCode.writeNumDecreasing(UnsignedInteger.fromIntBits(Integer.MIN_VALUE));
+    orderedCode.writeNumDecreasing(UnsignedInteger.fromIntBits(Integer.MAX_VALUE));
+    assertEquals(0, orderedCode.readNumDecreasing());
+    assertEquals(1, orderedCode.readNumDecreasing());
+    assertEquals(Long.valueOf(Integer.MAX_VALUE) + 1L, orderedCode.readNumDecreasing());
+    assertEquals(Integer.MAX_VALUE, orderedCode.readNumDecreasing());
+  }
+
+  /**
+   * Assert that encoding the specified long via
+   * {@link OrderedCode#writeSignedNumIncreasing(long)} results in the bytes
+   * represented by the specified string of hex digits.
+   * E.g. assertSignedNumIncreasingEncodingEquals("3fbf", -65) asserts that
+   * -65 is encoded as { (byte) 0x3f, (byte) 0xbf }.
+   */
+  private static void assertSignedNumIncreasingEncodingEquals(
+      String expectedHexEncoding, long num) {
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeSignedNumIncreasing(num);
+    assertEquals(
+        "Unexpected encoding for " + num,
+        expectedHexEncoding,
+        bytesToHexString(orderedCode.getEncodedBytes()));
+  }
+
+  /**
+   * Assert that encoding various long values via
+   * {@link OrderedCode#writeSignedNumIncreasing(long)} produces the expected
+   * bytes. Expected byte sequences were generated via the c++ (authoritative)
+   * implementation of OrderedCode::WriteSignedNumIncreasing.
+   */
+  @Test
+  public void testSignedNumIncreasing_write() {
+    assertSignedNumIncreasingEncodingEquals(
+        "003f8000000000000000", Long.MIN_VALUE);
+    assertSignedNumIncreasingEncodingEquals(
+        "003f8000000000000001", Long.MIN_VALUE + 1);
+    assertSignedNumIncreasingEncodingEquals(
+        "077fffffff", Integer.MIN_VALUE - 1L);
+    assertSignedNumIncreasingEncodingEquals("0780000000", Integer.MIN_VALUE);
+    assertSignedNumIncreasingEncodingEquals(
+        "0780000001", Integer.MIN_VALUE + 1);
+    assertSignedNumIncreasingEncodingEquals("3fbf", -65);
+    assertSignedNumIncreasingEncodingEquals("40", -64);
+    assertSignedNumIncreasingEncodingEquals("41", -63);
+    assertSignedNumIncreasingEncodingEquals("7d", -3);
+    assertSignedNumIncreasingEncodingEquals("7e", -2);
+    assertSignedNumIncreasingEncodingEquals("7f", -1);
+    assertSignedNumIncreasingEncodingEquals("80", 0);
+    assertSignedNumIncreasingEncodingEquals("81", 1);
+    assertSignedNumIncreasingEncodingEquals("82", 2);
+    assertSignedNumIncreasingEncodingEquals("83", 3);
+    assertSignedNumIncreasingEncodingEquals("bf", 63);
+    assertSignedNumIncreasingEncodingEquals("c040", 64);
+    assertSignedNumIncreasingEncodingEquals("c041", 65);
+    assertSignedNumIncreasingEncodingEquals(
+        "f87ffffffe", Integer.MAX_VALUE - 1);
+    assertSignedNumIncreasingEncodingEquals("f87fffffff", Integer.MAX_VALUE);
+    assertSignedNumIncreasingEncodingEquals(
+        "f880000000", Integer.MAX_VALUE + 1L);
+    assertSignedNumIncreasingEncodingEquals(
+        "ffc07ffffffffffffffe", Long.MAX_VALUE - 1);
+    assertSignedNumIncreasingEncodingEquals(
+        "ffc07fffffffffffffff", Long.MAX_VALUE);
+  }
+
+  /**
+   * Convert a string of hex digits (e.g. "3fbf") to a byte[]
+   * (e.g. { (byte) 0x3f, (byte) 0xbf }).
+   */
+  private static byte[] bytesFromHexString(String hexDigits) {
+    return BaseEncoding.base16().lowerCase().decode(hexDigits);
+  }
+
+  /**
+   * Convert a byte[] (e.g. { (byte) 0x3f, (byte) 0xbf }) to a string of hex
+   * digits (e.g. "3fbf").
+   */
+  private static String bytesToHexString(byte[] bytes) {
+    return BaseEncoding.base16().lowerCase().encode(bytes);
+  }
+
+  /**
+   * Assert that decoding (via {@link OrderedCode#readSignedNumIncreasing()})
+   * the bytes represented by the specified string of hex digits results in the
+   * expected long value.
+   * E.g. assertDecodedSignedNumIncreasingEquals(-65, "3fbf") asserts that the
+   * byte array { (byte) 0x3f, (byte) 0xbf } is decoded as -65.
+   */
+  private static void assertDecodedSignedNumIncreasingEquals(
+      long expectedNum, String encodedHexString) {
+    OrderedCode orderedCode =
+        new OrderedCode(bytesFromHexString(encodedHexString));
+    assertEquals(
+        "Unexpected value when decoding 0x" + encodedHexString,
+        expectedNum,
+        orderedCode.readSignedNumIncreasing());
+    assertFalse(
+        "Unexpected encoded bytes remain after decoding 0x" + encodedHexString,
+        orderedCode.hasRemainingEncodedBytes());
+  }
+
+  /**
+   * Assert that decoding various sequences of bytes via
+   * {@link OrderedCode#readSignedNumIncreasing()} produces the expected long
+   * value.
+   * Input byte sequences were generated via the c++ (authoritative)
+   * implementation of OrderedCode::WriteSignedNumIncreasing.
+   */
+  @Test
+  public void testSignedNumIncreasing_read() {
+    assertDecodedSignedNumIncreasingEquals(
+        Long.MIN_VALUE, "003f8000000000000000");
+    assertDecodedSignedNumIncreasingEquals(
+        Long.MIN_VALUE + 1, "003f8000000000000001");
+    assertDecodedSignedNumIncreasingEquals(
+        Integer.MIN_VALUE - 1L, "077fffffff");
+    assertDecodedSignedNumIncreasingEquals(Integer.MIN_VALUE, "0780000000");
+    assertDecodedSignedNumIncreasingEquals(Integer.MIN_VALUE + 1, "0780000001");
+    assertDecodedSignedNumIncreasingEquals(-65, "3fbf");
+    assertDecodedSignedNumIncreasingEquals(-64, "40");
+    assertDecodedSignedNumIncreasingEquals(-63, "41");
+    assertDecodedSignedNumIncreasingEquals(-3, "7d");
+    assertDecodedSignedNumIncreasingEquals(-2, "7e");
+    assertDecodedSignedNumIncreasingEquals(-1, "7f");
+    assertDecodedSignedNumIncreasingEquals(0, "80");
+    assertDecodedSignedNumIncreasingEquals(1, "81");
+    assertDecodedSignedNumIncreasingEquals(2, "82");
+    assertDecodedSignedNumIncreasingEquals(3, "83");
+    assertDecodedSignedNumIncreasingEquals(63, "bf");
+    assertDecodedSignedNumIncreasingEquals(64, "c040");
+    assertDecodedSignedNumIncreasingEquals(65, "c041");
+    assertDecodedSignedNumIncreasingEquals(Integer.MAX_VALUE - 1, "f87ffffffe");
+    assertDecodedSignedNumIncreasingEquals(Integer.MAX_VALUE, "f87fffffff");
+    assertDecodedSignedNumIncreasingEquals(
+        Integer.MAX_VALUE + 1L, "f880000000");
+    assertDecodedSignedNumIncreasingEquals(
+        Long.MAX_VALUE - 1, "ffc07ffffffffffffffe");
+    assertDecodedSignedNumIncreasingEquals(
+        Long.MAX_VALUE, "ffc07fffffffffffffff");
+  }
+
+  /**
+   * Assert that encoding (via
+   * {@link OrderedCode#writeSignedNumIncreasing(long)}) the specified long
+   * value and then decoding (via {@link OrderedCode#readSignedNumIncreasing()})
+   * results in the original value.
+   */
+  private static void assertSignedNumIncreasingWriteAndReadIsLossless(
+      long num) {
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeSignedNumIncreasing(num);
+    assertEquals(
+        "Unexpected result when decoding writeSignedNumIncreasing(" + num + ")",
+        num,
+        orderedCode.readSignedNumIncreasing());
+    assertFalse("Unexpected remaining encoded bytes after decoding " + num,
+        orderedCode.hasRemainingEncodedBytes());
+  }
+
+  /**
+   * Assert that for various long values, encoding (via
+   * {@link OrderedCode#writeSignedNumIncreasing(long)}) and then decoding (via
+   * {@link OrderedCode#readSignedNumIncreasing()}) results in the original
+   * value.
+   */
+  @Test
+  public void testSignedNumIncreasing_writeAndRead() {
+    assertSignedNumIncreasingWriteAndReadIsLossless(Long.MIN_VALUE);
+    assertSignedNumIncreasingWriteAndReadIsLossless(Long.MIN_VALUE + 1);
+    assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MIN_VALUE - 1L);
+    assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MIN_VALUE);
+    assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MIN_VALUE + 1);
+    assertSignedNumIncreasingWriteAndReadIsLossless(-65);
+    assertSignedNumIncreasingWriteAndReadIsLossless(-64);
+    assertSignedNumIncreasingWriteAndReadIsLossless(-63);
+    assertSignedNumIncreasingWriteAndReadIsLossless(-3);
+    assertSignedNumIncreasingWriteAndReadIsLossless(-2);
+    assertSignedNumIncreasingWriteAndReadIsLossless(-1);
+    assertSignedNumIncreasingWriteAndReadIsLossless(0);
+    assertSignedNumIncreasingWriteAndReadIsLossless(1);
+    assertSignedNumIncreasingWriteAndReadIsLossless(2);
+    assertSignedNumIncreasingWriteAndReadIsLossless(3);
+    assertSignedNumIncreasingWriteAndReadIsLossless(63);
+    assertSignedNumIncreasingWriteAndReadIsLossless(64);
+    assertSignedNumIncreasingWriteAndReadIsLossless(65);
+    assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MAX_VALUE - 1);
+    assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MAX_VALUE);
+    assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MAX_VALUE + 1L);
+    assertSignedNumIncreasingWriteAndReadIsLossless(Long.MAX_VALUE - 1);
+    assertSignedNumIncreasingWriteAndReadIsLossless(Long.MAX_VALUE);
+  }
+
+  /**
+   * Assert that encoding (via
+   * {@link OrderedCode#writeSignedNumDecreasing(long)}) the specified long
+   * value and then decoding (via {@link OrderedCode#readSignedNumDecreasing()})
+   * results in the original value.
+   */
+  private static void assertSignedNumDecreasingWriteAndReadIsLossless(
+      long num) {
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeSignedNumDecreasing(num);
+    assertEquals(
+        "Unexpected result when decoding writeSignedNumDecreasing(" + num + ")",
+        num,
+        orderedCode.readSignedNumDecreasing());
+    assertFalse("Unexpected remaining encoded bytes after decoding " + num,
+        orderedCode.hasRemainingEncodedBytes());
+  }
+
+  /**
+   * Assert that for various long values, encoding (via
+   * {@link OrderedCode#writeSignedNumDecreasing(long)}) and then decoding (via
+   * {@link OrderedCode#readSignedNumDecreasing()}) results in the original
+   * value.
+   */
+  @Test
+  public void testSignedNumDecreasing_writeAndRead() {
+    assertSignedNumDecreasingWriteAndReadIsLossless(Long.MIN_VALUE);
+    assertSignedNumDecreasingWriteAndReadIsLossless(Long.MIN_VALUE + 1);
+    assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MIN_VALUE - 1L);
+    assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MIN_VALUE);
+    assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MIN_VALUE + 1);
+    assertSignedNumDecreasingWriteAndReadIsLossless(-65);
+    assertSignedNumDecreasingWriteAndReadIsLossless(-64);
+    assertSignedNumDecreasingWriteAndReadIsLossless(-63);
+    assertSignedNumDecreasingWriteAndReadIsLossless(-3);
+    assertSignedNumDecreasingWriteAndReadIsLossless(-2);
+    assertSignedNumDecreasingWriteAndReadIsLossless(-1);
+    assertSignedNumDecreasingWriteAndReadIsLossless(0);
+    assertSignedNumDecreasingWriteAndReadIsLossless(1);
+    assertSignedNumDecreasingWriteAndReadIsLossless(2);
+    assertSignedNumDecreasingWriteAndReadIsLossless(3);
+    assertSignedNumDecreasingWriteAndReadIsLossless(63);
+    assertSignedNumDecreasingWriteAndReadIsLossless(64);
+    assertSignedNumDecreasingWriteAndReadIsLossless(65);
+    assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MAX_VALUE - 1);
+    assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MAX_VALUE);
+    assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MAX_VALUE + 1L);
+    assertSignedNumDecreasingWriteAndReadIsLossless(Long.MAX_VALUE - 1);
+    assertSignedNumDecreasingWriteAndReadIsLossless(Long.MAX_VALUE);
+  }
+
+  /** Ensures that numbers encoded as "decreasing" do indeed sort in reverse order. */
+  @Test
+  public void testDecreasing() {
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeSignedNumDecreasing(10L);
+    byte[] ten = orderedCode.getEncodedBytes();
+    orderedCode = new OrderedCode();
+    orderedCode.writeSignedNumDecreasing(20L);
+    byte[] twenty = orderedCode.getEncodedBytes();
+    // In decreasing order, twenty preceeds ten.
+    assertTrue(compare(twenty, ten) < 0);
+  }
+
+  @Test
+  public void testLog2Floor_Positive() {
+    OrderedCode orderedCode = new OrderedCode();
+    assertEquals(0, orderedCode.log2Floor(1));
+    assertEquals(1, orderedCode.log2Floor(2));
+    assertEquals(1, orderedCode.log2Floor(3));
+    assertEquals(2, orderedCode.log2Floor(4));
+    assertEquals(5, orderedCode.log2Floor(63));
+    assertEquals(6, orderedCode.log2Floor(64));
+    assertEquals(62, orderedCode.log2Floor(Long.MAX_VALUE));
+  }
+
+  /**
+   * OrderedCode.log2Floor(long) is defined to return -1 given an input of zero
+   * (because that's what Bits::Log2Floor64(uint64) does).
+   */
+  @Test
+  public void testLog2Floor_zero() {
+    OrderedCode orderedCode = new OrderedCode();
+    assertEquals(-1, orderedCode.log2Floor(0));
+  }
+
+  @Test
+  public void testLog2Floor_negative() {
+    OrderedCode orderedCode = new OrderedCode();
+    try {
+      orderedCode.log2Floor(-1);
+      fail("Expected an IllegalArgumentException.");
+    } catch (IllegalArgumentException expected) {
+      // Expected!
+    }
+  }
+
+  @Test
+  public void testGetSignedEncodingLength() {
+    OrderedCode orderedCode = new OrderedCode();
+    assertEquals(10, orderedCode.getSignedEncodingLength(Long.MIN_VALUE));
+    assertEquals(10, orderedCode.getSignedEncodingLength(~(1L << 62)));
+    assertEquals(9, orderedCode.getSignedEncodingLength(~(1L << 62) + 1));
+    assertEquals(3, orderedCode.getSignedEncodingLength(-8193));
+    assertEquals(2, orderedCode.getSignedEncodingLength(-8192));
+    assertEquals(2, orderedCode.getSignedEncodingLength(-65));
+    assertEquals(1, orderedCode.getSignedEncodingLength(-64));
+    assertEquals(1, orderedCode.getSignedEncodingLength(-2));
+    assertEquals(1, orderedCode.getSignedEncodingLength(-1));
+    assertEquals(1, orderedCode.getSignedEncodingLength(0));
+    assertEquals(1, orderedCode.getSignedEncodingLength(1));
+    assertEquals(1, orderedCode.getSignedEncodingLength(63));
+    assertEquals(2, orderedCode.getSignedEncodingLength(64));
+    assertEquals(2, orderedCode.getSignedEncodingLength(8191));
+    assertEquals(3, orderedCode.getSignedEncodingLength(8192));
+    assertEquals(9, orderedCode.getSignedEncodingLength((1L << 62)) - 1);
+    assertEquals(10, orderedCode.getSignedEncodingLength(1L << 62));
+    assertEquals(10, orderedCode.getSignedEncodingLength(Long.MAX_VALUE));
+  }
+
+  @Test
+  public void testWriteTrailingBytes() {
+    byte[] escapeChars = new byte[] { OrderedCode.ESCAPE1,
+        OrderedCode.NULL_CHARACTER, OrderedCode.SEPARATOR, OrderedCode.ESCAPE2,
+        OrderedCode.INFINITY, OrderedCode.FF_CHARACTER};
+    byte[] anotherArray = new byte[] { 'a', 'b', 'c', 'd', 'e' };
+
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeTrailingBytes(escapeChars);
+    assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), escapeChars));
+    assertTrue(Arrays.equals(orderedCode.readTrailingBytes(), escapeChars));
+    try {
+      orderedCode.readInfinity();
+      fail("Expected IllegalArgumentException.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+    orderedCode = new OrderedCode();
+    orderedCode.writeTrailingBytes(anotherArray);
+    assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), anotherArray));
+    assertTrue(Arrays.equals(orderedCode.readTrailingBytes(), anotherArray));
+  }
+
+  @Test
+  public void testMixedWrite() {
+    byte[] first = { 'a', 'b', 'c'};
+    byte[] second = { 'd', 'e', 'f'};
+    byte[] last = { 'x', 'y', 'z'};
+    byte[] escapeChars = new byte[] { OrderedCode.ESCAPE1,
+        OrderedCode.NULL_CHARACTER, OrderedCode.SEPARATOR, OrderedCode.ESCAPE2,
+        OrderedCode.INFINITY, OrderedCode.FF_CHARACTER};
+
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeBytes(first);
+    orderedCode.writeBytes(second);
+    orderedCode.writeBytes(last);
+    orderedCode.writeInfinity();
+    orderedCode.writeNumIncreasing(0);
+    orderedCode.writeNumIncreasing(1);
+    orderedCode.writeNumIncreasing(Long.MIN_VALUE);
+    orderedCode.writeNumIncreasing(Long.MAX_VALUE);
+    orderedCode.writeSignedNumIncreasing(0);
+    orderedCode.writeSignedNumIncreasing(1);
+    orderedCode.writeSignedNumIncreasing(Long.MIN_VALUE);
+    orderedCode.writeSignedNumIncreasing(Long.MAX_VALUE);
+    orderedCode.writeTrailingBytes(escapeChars);
+    byte[] allEncoded = orderedCode.getEncodedBytes();
+    assertTrue(Arrays.equals(orderedCode.readBytes(), first));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), second));
+    assertFalse(orderedCode.readInfinity());
+    assertTrue(Arrays.equals(orderedCode.readBytes(), last));
+    assertTrue(orderedCode.readInfinity());
+    assertEquals(0, orderedCode.readNumIncreasing());
+    assertEquals(1, orderedCode.readNumIncreasing());
+    assertFalse(orderedCode.readInfinity());
+    assertEquals(Long.MIN_VALUE, orderedCode.readNumIncreasing());
+    assertEquals(Long.MAX_VALUE, orderedCode.readNumIncreasing());
+    assertEquals(0, orderedCode.readSignedNumIncreasing());
+    assertEquals(1, orderedCode.readSignedNumIncreasing());
+    assertFalse(orderedCode.readInfinity());
+    assertEquals(Long.MIN_VALUE, orderedCode.readSignedNumIncreasing());
+    assertEquals(Long.MAX_VALUE, orderedCode.readSignedNumIncreasing());
+    assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), escapeChars));
+    assertTrue(Arrays.equals(orderedCode.readTrailingBytes(), escapeChars));
+
+    orderedCode = new OrderedCode(allEncoded);
+    assertTrue(Arrays.equals(orderedCode.readBytes(), first));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), second));
+    assertFalse(orderedCode.readInfinity());
+    assertTrue(Arrays.equals(orderedCode.readBytes(), last));
+    assertTrue(orderedCode.readInfinity());
+    assertEquals(0, orderedCode.readNumIncreasing());
+    assertEquals(1, orderedCode.readNumIncreasing());
+    assertFalse(orderedCode.readInfinity());
+    assertEquals(Long.MIN_VALUE, orderedCode.readNumIncreasing());
+    assertEquals(Long.MAX_VALUE, orderedCode.readNumIncreasing());
+    assertEquals(0, orderedCode.readSignedNumIncreasing());
+    assertEquals(1, orderedCode.readSignedNumIncreasing());
+    assertFalse(orderedCode.readInfinity());
+    assertEquals(Long.MIN_VALUE, orderedCode.readSignedNumIncreasing());
+    assertEquals(Long.MAX_VALUE, orderedCode.readSignedNumIncreasing());
+    assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), escapeChars));
+    assertTrue(Arrays.equals(orderedCode.readTrailingBytes(), escapeChars));
+  }
+
+  @Test
+  public void testEdgeCases() {
+    byte[] ffChar = {OrderedCode.FF_CHARACTER};
+    byte[] nullChar = {OrderedCode.NULL_CHARACTER};
+
+    byte[] separatorEncoded = {OrderedCode.ESCAPE1, OrderedCode.SEPARATOR};
+    byte[] ffCharEncoded = {OrderedCode.ESCAPE1, OrderedCode.NULL_CHARACTER};
+    byte[] nullCharEncoded = {OrderedCode.ESCAPE2, OrderedCode.FF_CHARACTER};
+    byte[] infinityEncoded  = {OrderedCode.ESCAPE2, OrderedCode.INFINITY};
+
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeBytes(ffChar);
+    orderedCode.writeBytes(nullChar);
+    orderedCode.writeInfinity();
+    assertTrue(Arrays.equals(orderedCode.getEncodedBytes(),
+        Bytes.concat(ffCharEncoded, separatorEncoded,
+            nullCharEncoded, separatorEncoded,
+            infinityEncoded)));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), ffChar));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), nullChar));
+    assertTrue(orderedCode.readInfinity());
+
+    orderedCode = new OrderedCode(
+        Bytes.concat(ffCharEncoded, separatorEncoded));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), ffChar));
+
+    orderedCode = new OrderedCode(
+        Bytes.concat(nullCharEncoded, separatorEncoded));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), nullChar));
+
+    byte[] invalidEncodingForRead = {OrderedCode.ESCAPE2, OrderedCode.ESCAPE2,
+        OrderedCode.ESCAPE1, OrderedCode.SEPARATOR};
+    orderedCode = new OrderedCode(invalidEncodingForRead);
+    try {
+      orderedCode.readBytes();
+      fail("Should have failed.");
+    } catch (Exception e) {
+      // Expected
+    }
+    assertTrue(orderedCode.hasRemainingEncodedBytes());
+  }
+
+  @Test
+  public void testHasRemainingEncodedBytes() {
+    byte[] bytes = { 'a', 'b', 'c'};
+    long number = 12345;
+
+    // Empty
+    OrderedCode orderedCode = new OrderedCode();
+    assertFalse(orderedCode.hasRemainingEncodedBytes());
+
+    // First and only field of each type.
+    orderedCode.writeBytes(bytes);
+    assertTrue(orderedCode.hasRemainingEncodedBytes());
+    assertTrue(Arrays.equals(orderedCode.readBytes(), bytes));
+    assertFalse(orderedCode.hasRemainingEncodedBytes());
+
+    orderedCode.writeNumIncreasing(number);
+    assertTrue(orderedCode.hasRemainingEncodedBytes());
+    assertEquals(orderedCode.readNumIncreasing(), number);
+    assertFalse(orderedCode.hasRemainingEncodedBytes());
+
+    orderedCode.writeSignedNumIncreasing(number);
+    assertTrue(orderedCode.hasRemainingEncodedBytes());
+    assertEquals(orderedCode.readSignedNumIncreasing(), number);
+    assertFalse(orderedCode.hasRemainingEncodedBytes());
+
+    orderedCode.writeInfinity();
+    assertTrue(orderedCode.hasRemainingEncodedBytes());
+    assertTrue(orderedCode.readInfinity());
+    assertFalse(orderedCode.hasRemainingEncodedBytes());
+
+    orderedCode.writeTrailingBytes(bytes);
+    assertTrue(orderedCode.hasRemainingEncodedBytes());
+    assertTrue(Arrays.equals(orderedCode.readTrailingBytes(), bytes));
+    assertFalse(orderedCode.hasRemainingEncodedBytes());
+
+    // Two fields of same type.
+    orderedCode.writeBytes(bytes);
+    orderedCode.writeBytes(bytes);
+    assertTrue(orderedCode.hasRemainingEncodedBytes());
+    assertTrue(Arrays.equals(orderedCode.readBytes(), bytes));
+    assertTrue(Arrays.equals(orderedCode.readBytes(), bytes));
+    assertFalse(orderedCode.hasRemainingEncodedBytes());
+  }
+
+  @Test
+  public void testOrderingInfinity() {
+    OrderedCode inf = new OrderedCode();
+    inf.writeInfinity();
+
+    OrderedCode negInf = new OrderedCode();
+    negInf.writeInfinityDecreasing();
+
+    OrderedCode longValue = new OrderedCode();
+    longValue.writeSignedNumIncreasing(1);
+
+    assertTrue(compare(inf.getEncodedBytes(), negInf.getEncodedBytes()) > 0);
+    assertTrue(compare(longValue.getEncodedBytes(), negInf.getEncodedBytes()) > 0);
+    assertTrue(compare(inf.getEncodedBytes(), longValue.getEncodedBytes()) > 0);
+  }
+
+  private int compare(byte[] bytes1, byte[] bytes2) {
+    return UnsignedBytes.lexicographicalComparator().compare(bytes1, bytes2);
+  }
+}


[2/2] beam git commit: This closes #3925

Posted by ch...@apache.org.
This closes #3925


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/480e60fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/480e60fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/480e60fa

Branch: refs/heads/master
Commit: 480e60fa4c808061b28f9662773c03180a1f5bf3
Parents: 63b54a5 57bde53
Author: chamikara@google.com <ch...@google.com>
Authored: Fri Oct 6 09:05:24 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Fri Oct 6 09:05:24 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/spanner/OrderedCode.java    | 764 ++++++++++++++++
 .../sdk/io/gcp/spanner/OrderedCodeTest.java     | 890 +++++++++++++++++++
 2 files changed, 1654 insertions(+)
----------------------------------------------------------------------