You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/05/12 01:36:22 UTC

[kudu] branch master updated: KUDU-2671 implement decoding of RowOperationsPB in Java client

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e19a6822 KUDU-2671 implement decoding of RowOperationsPB in Java client
1e19a6822 is described below

commit 1e19a68221fbdc4151cac077f6a147e415430c30
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Apr 29 22:01:08 2022 -0700

    KUDU-2671 implement decoding of RowOperationsPB in Java client
    
    This patch implements decoding of range partitioning information
    out of RowOperationsPB.  A few tests added as well to cover the
    new functionality.
    
    In the context of supporting custom hash schemas per range in the
    Kudu Java client, a few follow-up changelists need the newly introduced
    functionality.
    
    Change-Id: I4f69f89f6b9e47d79b83c2109d85a95288bec380
    Reviewed-on: http://gerrit.cloudera.org:8080/18494
    Reviewed-by: Attila Bukor <ab...@apache.org>
    Reviewed-by: Mahesh Reddy <mr...@cloudera.com>
    Tested-by: Alexey Serbin <al...@apache.org>
---
 .../src/main/java/org/apache/kudu/Type.java        |  17 +-
 .../java/org/apache/kudu/client/KeyEncoder.java    |  15 +-
 .../java/org/apache/kudu/client/Operation.java     | 164 +++++++++-
 .../java/org/apache/kudu/client/TestOperation.java | 330 ++++++++++++++++++++-
 4 files changed, 506 insertions(+), 20 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Type.java b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
index 5a346c39c..9d550b66c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Type.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
@@ -163,8 +163,8 @@ public enum Type {
       case DOUBLE:
       case UNIXTIME_MICROS:
         return Longs.BYTES;
-      default: throw new IllegalArgumentException("The provided data type doesn't map" +
-          " to know any known one.");
+      default: throw new IllegalArgumentException(
+          "the provided data type doesn't map to any known one");
     }
   }
 
@@ -205,9 +205,8 @@ public enum Type {
       case DECIMAL128:
         return DECIMAL;
       default:
-        throw new IllegalArgumentException("The provided data type doesn't map" +
-            " to know any known one: " + type.getDescriptorForType().getFullName());
-
+        throw new IllegalArgumentException("the provided data type doesn't map " +
+            "to any known one: " + type.getDescriptorForType().getFullName());
     }
   }
 
@@ -228,4 +227,10 @@ public enum Type {
     throw new IllegalArgumentException("The provided name doesn't map to any known type: " + name);
   }
 
-}
+  /**
+   * @return true if this type has a pre-determined fixed size, false otherwise
+   */
+  public boolean isFixedSize() {
+    return this != BINARY && this != STRING && this != VARCHAR;
+  }
+}
\ No newline at end of file
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
index fd3ec6997..0e3500e22 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
@@ -145,15 +145,14 @@ class KeyEncoder {
       throw new IllegalStateException(String.format("Primary key column %s is not set",
                                                     column.getName()));
     }
-    final Type type = column.getType();
-    if (type == Type.STRING || type == Type.BINARY ||
-        type == Type.VARCHAR) {
-      encodeBinary(row.getVarLengthData().get(columnIdx), isLast, buf);
+    if (column.getType().isFixedSize()) {
+      encodeSignedInt(
+          row.getRowAlloc(),
+          schema.getColumnOffset(columnIdx),
+          column.getTypeSize(),
+          buf);
     } else {
-      encodeSignedInt(row.getRowAlloc(),
-                      schema.getColumnOffset(columnIdx),
-                      column.getTypeSize(),
-                      buf);
+      encodeBinary(row.getVarLengthData().get(columnIdx), isLast, buf);
     }
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 0ed9caca2..0b2b933fa 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -407,8 +407,9 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
         ColumnSchema col = schema.getColumnByIndex(colIdx);
         // Keys should always be specified, maybe check?
         if (row.isSet(colIdx) && !row.isSetToNull(colIdx)) {
-          if (col.getType() == Type.STRING || col.getType() == Type.BINARY ||
-              col.getType() == Type.VARCHAR) {
+          if (col.getType().isFixedSize()) {
+            rows.put(rowData, currentRowOffset, col.getTypeSize());
+          } else {
             ByteBuffer varLengthData = row.getVarLengthData().get(colIdx);
             varLengthData.reset();
             rows.putLong(indirectWrittenBytes);
@@ -416,9 +417,6 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
             rows.putLong(bbSize);
             indirect.add(varLengthData);
             indirectWrittenBytes += bbSize;
-          } else {
-            // This is for cols other than strings
-            rows.put(rowData, currentRowOffset, col.getTypeSize());
           }
         }
         currentRowOffset += col.getTypeSize();
@@ -482,4 +480,160 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
       return toPB();
     }
   }
+
+  static class OperationsDecoder {
+    private Schema schema;
+    private int columnBitmapSize;
+    private int columnCount;
+
+    /**
+     * Utility method to initialize internals of the OperationsDecoder class.
+     *
+     * @param schema table's schema used for encoding/decoding of the data
+     */
+    private void init(Schema schema) {
+      this.schema = schema;
+      this.columnCount = schema.getColumnCount();
+      this.columnBitmapSize = Bytes.getBitSetSize(this.columnCount);
+    }
+
+    /**
+     * Decode range partitions from the 'pb' message, assuming the data has been
+     * encoded using the table schema in 'schema'.
+     *
+     * @param pb the encoded data
+     * @param schema the table schema to use for decoding
+     * @return a list of PangePartition objects with corresponding bounds
+     */
+    public List<CreateTableOptions.RangePartition> decodeRangePartitions(
+        RowOperationsPB pb, Schema schema) {
+      if (pb == null) {
+        return null;
+      }
+      if (!pb.hasRows()) {
+        throw new IllegalArgumentException("row operation PB lacks 'rows' field");
+      }
+
+      init(schema);
+
+      ByteBuffer rowsBuf = ByteBuffer.wrap(pb.getRows().toByteArray());
+      rowsBuf.order(ByteOrder.LITTLE_ENDIAN);
+      final byte[] indirectData = pb.getIndirectData().toByteArray();
+
+      List<Pair<PartialRow, RowOperationsPB.Type>> decodedBounds =
+          new ArrayList<>();
+      while (rowsBuf.hasRemaining()) {
+        decodedBounds.add(decodeBound(rowsBuf, indirectData));
+      }
+
+      if (decodedBounds.size() % 2 != 0) {
+        throw new IllegalArgumentException(
+            "unexpected odd number of range partition bounds");
+      }
+
+      List<CreateTableOptions.RangePartition> result = new ArrayList<>();
+      for (int i = 0; i < decodedBounds.size(); i += 2) {
+        Pair<PartialRow, RowOperationsPB.Type> lower = decodedBounds.get(i);
+        Pair<PartialRow, RowOperationsPB.Type> upper = decodedBounds.get(i + 1);
+
+        RangePartitionBound lowerType;
+        if (lower.getSecond() == RowOperationsPB.Type.EXCLUSIVE_RANGE_LOWER_BOUND) {
+          lowerType = RangePartitionBound.EXCLUSIVE_BOUND;
+        } else if (lower.getSecond() == RowOperationsPB.Type.RANGE_LOWER_BOUND) {
+          lowerType = RangePartitionBound.INCLUSIVE_BOUND;
+        } else {
+          throw new IllegalArgumentException(String.format(
+              "%s: unexpected bound type for the lower bound", lower.getSecond().toString()));
+        }
+
+        RangePartitionBound upperType;
+        if (upper.getSecond() == RowOperationsPB.Type.INCLUSIVE_RANGE_UPPER_BOUND) {
+          upperType = RangePartitionBound.INCLUSIVE_BOUND;
+        } else if (upper.getSecond() == RowOperationsPB.Type.RANGE_UPPER_BOUND) {
+          upperType = RangePartitionBound.EXCLUSIVE_BOUND;
+        } else {
+          throw new IllegalArgumentException(String.format(
+              "%s: unexpected bound type for the upper bound", upper.getSecond().toString()));
+        }
+
+        result.add(new CreateTableOptions.RangePartition(
+            lower.getFirst(), upper.getFirst(), lowerType, upperType));
+      }
+      return result;
+    }
+
+    /**
+     * Decode lower and upper range bounds encoded with the RowOperationsPB
+     * conventions.
+     *
+     * @param rowsBuf byte buffer wrapping RowOperationsPB.rows
+     * @param indirectData byte array of the RowOperationsPB.indirect_data field
+     * @return a pair: decoded bound as PartialRow and the type of the bound
+     */
+    public Pair<PartialRow, RowOperationsPB.Type> decodeBound(
+        ByteBuffer rowsBuf, byte[] indirectData) {
+      RowOperationsPB.Type opType;
+      final byte opTypeEncoded = rowsBuf.get();
+      switch (opTypeEncoded) {
+        case RowOperationsPB.Type.EXCLUSIVE_RANGE_LOWER_BOUND_VALUE:
+          opType = RowOperationsPB.Type.EXCLUSIVE_RANGE_LOWER_BOUND;
+          break;
+        case RowOperationsPB.Type.RANGE_LOWER_BOUND_VALUE:
+          opType = RowOperationsPB.Type.RANGE_LOWER_BOUND;
+          break;
+        case RowOperationsPB.Type.INCLUSIVE_RANGE_UPPER_BOUND_VALUE:
+          opType = RowOperationsPB.Type.INCLUSIVE_RANGE_UPPER_BOUND;
+          break;
+        case RowOperationsPB.Type.RANGE_UPPER_BOUND_VALUE:
+          opType = RowOperationsPB.Type.RANGE_UPPER_BOUND;
+          break;
+        default:
+          throw new IllegalArgumentException(String.format(
+              "%d: unexpected operation type", opTypeEncoded));
+      }
+
+      // Read the 'isset' column bitmap.
+      byte[] columnsBitArray = new byte[columnBitmapSize];
+      rowsBuf.get(columnsBitArray);
+      BitSet columnsBitSet = Bytes.toBitSet(columnsBitArray, 0, 8 * columnBitmapSize);
+
+      // If present, read the 'null' column bitmap.
+      BitSet nullsBitSet = null;
+      if (schema.hasNullableColumns()) {
+        byte[] columnsNullArray = new byte[columnBitmapSize];
+        rowsBuf.get(columnsNullArray);
+        nullsBitSet = Bytes.toBitSet(columnsNullArray, 0, 8 * columnBitmapSize);
+      }
+
+      // Construct the PartialRow object to contain the boundary as decoded.
+      PartialRow resultRow = schema.newPartialRow();
+      for (int i = 0; i < columnsBitSet.size(); ++i) {
+        // Read the columns which has been set to a non-null value.
+        if (columnsBitSet.get(i) && (nullsBitSet == null || !nullsBitSet.get(i))) {
+          final ColumnSchema columnSchema = schema.getColumnByIndex(i);
+          if (columnSchema.getType().isFixedSize()) {
+            // The data for fixed-size types is read from the 'rowsBuf' buffer.
+            byte[] columnData = new byte[columnSchema.getTypeSize()];
+            rowsBuf.get(columnData);
+            resultRow.setRaw(i, columnData);
+          } else {
+            // The data for variable-size types is read from the 'indirectData'
+            // byte array.
+            final int indirectOffset = (int)rowsBuf.getLong();
+            final int indirectSize = (int)rowsBuf.getLong();
+
+            ByteBuffer auxBuf = ByteBuffer.wrap(
+                indirectData, indirectOffset, indirectSize);
+            auxBuf.order(ByteOrder.LITTLE_ENDIAN);
+
+            byte[] columnData = new byte[indirectSize];
+            auxBuf.get(columnData);
+            resultRow.setRaw(i, columnData);
+          }
+        }
+
+      }
+      return new Pair<PartialRow, RowOperationsPB.Type>(resultRow, opType);
+    }
+  }
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
index 42c747ca0..8b6e0d792 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
@@ -18,9 +18,12 @@
 package org.apache.kudu.client;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
+import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Longs;
@@ -35,6 +38,7 @@ import org.apache.kudu.Type;
 import org.apache.kudu.client.Operation.ChangeType;
 import org.apache.kudu.test.junit.RetryRule;
 import org.apache.kudu.tserver.Tserver.WriteRequestPBOrBuilder;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DateUtil;
 
 /**
@@ -46,7 +50,7 @@ public class TestOperation {
   public RetryRule retryRule = new RetryRule();
 
   private Schema createManyStringsSchema() {
-    ArrayList<ColumnSchema> columns = new ArrayList<>(4);
+    ArrayList<ColumnSchema> columns = new ArrayList<>(5);
     columns.add(new ColumnSchema.ColumnSchemaBuilder("c0", Type.STRING).key(true).build());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.STRING).build());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("c2", Type.STRING).build());
@@ -175,4 +179,328 @@ public class TestOperation {
       // Expected.
     }
   }
+
+  @Test
+  public void testEncodeDecodeRangeSimpleTypes() {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c0", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT64).build());
+    final Schema schema = new Schema(columns);
+
+    final PartialRow lower = schema.newPartialRow();
+    lower.addInt("c0", 0);
+
+    final PartialRow upper = schema.newPartialRow();
+    upper.addInt("c0", 100);
+
+    final Operation.OperationsEncoder enc = new Operation.OperationsEncoder();
+    final RowOperationsPB encoded = enc.encodeLowerAndUpperBounds(
+        lower, upper, RangePartitionBound.INCLUSIVE_BOUND, RangePartitionBound.EXCLUSIVE_BOUND);
+
+    Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
+    List<CreateTableOptions.RangePartition> decoded =
+        dec.decodeRangePartitions(encoded, schema);
+    assertEquals(1, decoded.size());
+    assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
+        decoded.get(0).getLowerBoundType());
+    assertEquals(RangePartitionBound.EXCLUSIVE_BOUND,
+        decoded.get(0).getUpperBoundType());
+    final PartialRow lowerDecoded = decoded.get(0).getLowerBound();
+    final PartialRow upperDecoded = decoded.get(0).getUpperBound();
+
+    assertTrue(lowerDecoded.isSet("c0"));
+    assertEquals(0, lowerDecoded.getInt("c0"));
+    assertFalse(lowerDecoded.isSet("c1"));
+    assertEquals(lower.toString(), lowerDecoded.toString());
+
+    assertTrue(upperDecoded.isSet("c0"));
+    assertEquals(100, upperDecoded.getInt("c0"));
+    assertFalse(upperDecoded.isSet("c1"));
+    assertEquals(upper.toString(), upperDecoded.toString());
+  }
+
+  @Test
+  public void testEncodeDecodeRangeStringTypes() {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c0", Type.STRING).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.STRING).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c2", Type.VARCHAR)
+        .nullable(true)
+        .typeAttributes(CharUtil.typeAttributes(10))
+        .build());
+    final Schema schema = new Schema(columns);
+
+    final PartialRow lower = schema.newPartialRow();
+    lower.addString("c0", "a");
+
+    final PartialRow upper = schema.newPartialRow();
+    upper.addString("c0", "b");
+
+    final Operation.OperationsEncoder enc = new Operation.OperationsEncoder();
+    final RowOperationsPB encoded = enc.encodeLowerAndUpperBounds(
+        lower, upper, RangePartitionBound.INCLUSIVE_BOUND, RangePartitionBound.EXCLUSIVE_BOUND);
+
+    Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
+    List<CreateTableOptions.RangePartition> decoded =
+        dec.decodeRangePartitions(encoded, schema);
+    assertEquals(1, decoded.size());
+    assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
+        decoded.get(0).getLowerBoundType());
+    assertEquals(RangePartitionBound.EXCLUSIVE_BOUND,
+        decoded.get(0).getUpperBoundType());
+    final PartialRow lowerDecoded = decoded.get(0).getLowerBound();
+    final PartialRow upperDecoded = decoded.get(0).getUpperBound();
+
+    assertTrue(lowerDecoded.isSet("c0"));
+    assertEquals("a", lowerDecoded.getString("c0"));
+    assertFalse(lowerDecoded.isSet("c1"));
+    assertFalse(lowerDecoded.isSet("c2"));
+    assertEquals(lower.toString(), lowerDecoded.toString());
+
+    assertTrue(upperDecoded.isSet("c0"));
+    assertEquals("b", upperDecoded.getString("c0"));
+    assertFalse(upperDecoded.isSet("c1"));
+    assertFalse(upperDecoded.isSet("c2"));
+    assertEquals(upper.toString(), upperDecoded.toString());
+  }
+
+  @Test
+  public void testEncodeDecodeRangeMixedTypes() {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c0i", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1s", Type.STRING).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c2i", Type.INT64).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c3s", Type.STRING).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c4i", Type.INT16).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c5s", Type.BINARY).nullable(true).build());
+    final Schema schema = new Schema(columns);
+
+    final PartialRow lower = schema.newPartialRow();
+    lower.addInt("c0i", 0);
+    lower.addString("c1s", "a");
+    lower.addLong("c2i", -10);
+    lower.addString("c3s", "A");
+    lower.addShort("c4i", (short)-100);
+
+    final PartialRow upper = schema.newPartialRow();
+    upper.addInt("c0i", 1);
+    upper.addString("c1s", "b");
+    upper.addLong("c2i", 10);
+    upper.addString("c3s", "B");
+    upper.addShort("c4i", (short)100);
+
+    final Operation.OperationsEncoder enc = new Operation.OperationsEncoder();
+    final RowOperationsPB encoded = enc.encodeLowerAndUpperBounds(
+        lower, upper, RangePartitionBound.INCLUSIVE_BOUND, RangePartitionBound.EXCLUSIVE_BOUND);
+
+    Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
+    List<CreateTableOptions.RangePartition> decoded =
+        dec.decodeRangePartitions(encoded, schema);
+    assertEquals(1, decoded.size());
+    assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
+        decoded.get(0).getLowerBoundType());
+    assertEquals(RangePartitionBound.EXCLUSIVE_BOUND,
+        decoded.get(0).getUpperBoundType());
+    final PartialRow lowerDecoded = decoded.get(0).getLowerBound();
+    final PartialRow upperDecoded = decoded.get(0).getUpperBound();
+
+    assertTrue(lowerDecoded.isSet("c0i"));
+    assertEquals(0, lowerDecoded.getInt("c0i"));
+    assertTrue(lowerDecoded.isSet("c1s"));
+    assertEquals("a", lowerDecoded.getString("c1s"));
+    assertTrue(lowerDecoded.isSet("c2i"));
+    assertEquals(-10, lowerDecoded.getLong("c2i"));
+    assertTrue(lowerDecoded.isSet("c3s"));
+    assertEquals("A", lowerDecoded.getString("c3s"));
+    assertTrue(lowerDecoded.isSet("c4i"));
+    assertEquals(-100, lowerDecoded.getShort("c4i"));
+    assertFalse(lowerDecoded.isSet("c5s"));
+    assertEquals(lower.toString(), lowerDecoded.toString());
+
+    assertTrue(upperDecoded.isSet("c0i"));
+    assertEquals(1, upperDecoded.getInt("c0i"));
+    assertTrue(upperDecoded.isSet("c1s"));
+    assertEquals("b", upperDecoded.getString("c1s"));
+    assertTrue(upperDecoded.isSet("c2i"));
+    assertEquals(10, upperDecoded.getLong("c2i"));
+    assertTrue(upperDecoded.isSet("c3s"));
+    assertEquals("B", upperDecoded.getString("c3s"));
+    assertTrue(upperDecoded.isSet("c4i"));
+    assertEquals(100, upperDecoded.getShort("c4i"));
+    assertFalse(upperDecoded.isSet("c5s"));
+    assertEquals(upper.toString(), upperDecoded.toString());
+  }
+
+  @Test
+  public void testEncodeDecodeMultipleRangePartitions() {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c0", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT64).build());
+    final Schema schema = new Schema(columns);
+
+    List<CreateTableOptions.RangePartition> rangePartitions = new ArrayList<>();
+    {
+      final PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", 0);
+
+      final PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 100);
+      rangePartitions.add(new CreateTableOptions.RangePartition(
+          lower,
+          upper,
+          RangePartitionBound.INCLUSIVE_BOUND,
+          RangePartitionBound.EXCLUSIVE_BOUND));
+    }
+    {
+      final PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", 200);
+
+      final PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 300);
+      rangePartitions.add(new CreateTableOptions.RangePartition(
+          lower,
+          upper,
+          RangePartitionBound.EXCLUSIVE_BOUND,
+          RangePartitionBound.INCLUSIVE_BOUND));
+    }
+
+    final Operation.OperationsEncoder enc = new Operation.OperationsEncoder();
+    final RowOperationsPB encoded = enc.encodeRangePartitions(
+        rangePartitions, ImmutableList.of());
+
+    Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
+    List<CreateTableOptions.RangePartition> decoded =
+        dec.decodeRangePartitions(encoded, schema);
+    assertEquals(2, decoded.size());
+
+    assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
+        decoded.get(0).getLowerBoundType());
+    assertEquals(RangePartitionBound.EXCLUSIVE_BOUND,
+        decoded.get(0).getUpperBoundType());
+    {
+      final PartialRow lowerDecoded = decoded.get(0).getLowerBound();
+      final PartialRow upperDecoded = decoded.get(0).getUpperBound();
+
+      assertTrue(lowerDecoded.isSet("c0"));
+      assertEquals(0, lowerDecoded.getInt("c0"));
+      assertFalse(lowerDecoded.isSet("c1"));
+
+      assertTrue(upperDecoded.isSet("c0"));
+      assertEquals(100, upperDecoded.getInt("c0"));
+      assertFalse(upperDecoded.isSet("c1"));
+    }
+
+    assertEquals(RangePartitionBound.EXCLUSIVE_BOUND,
+        decoded.get(1).getLowerBoundType());
+    assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
+        decoded.get(1).getUpperBoundType());
+    {
+      final PartialRow lowerDecoded = decoded.get(1).getLowerBound();
+      final PartialRow upperDecoded = decoded.get(1).getUpperBound();
+
+      assertTrue(lowerDecoded.isSet("c0"));
+      assertEquals(200, lowerDecoded.getInt("c0"));
+      assertFalse(lowerDecoded.isSet("c1"));
+
+      assertTrue(upperDecoded.isSet("c0"));
+      assertEquals(300, upperDecoded.getInt("c0"));
+      assertFalse(upperDecoded.isSet("c1"));
+    }
+  }
+
+  @Test
+  public void testEncodeDecodeMultipleRangePartitionsNullableColumns() {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c0", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.STRING).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c2", Type.INT64).nullable(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c3", Type.STRING).nullable(true).build());
+    final Schema schema = new Schema(columns);
+
+    List<CreateTableOptions.RangePartition> rangePartitions = new ArrayList<>();
+    {
+      final PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", 0);
+      lower.addString("c1", "a");
+
+      final PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 100);
+      upper.addString("c1", "c");
+      rangePartitions.add(new CreateTableOptions.RangePartition(
+          lower,
+          upper,
+          RangePartitionBound.INCLUSIVE_BOUND,
+          RangePartitionBound.EXCLUSIVE_BOUND));
+    }
+    {
+      final PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", 200);
+      lower.addString("c1", "e");
+
+      final PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 300);
+      upper.addString("c1", "f");
+      rangePartitions.add(new CreateTableOptions.RangePartition(
+          lower,
+          upper,
+          RangePartitionBound.EXCLUSIVE_BOUND,
+          RangePartitionBound.INCLUSIVE_BOUND));
+    }
+
+    final Operation.OperationsEncoder enc = new Operation.OperationsEncoder();
+    final RowOperationsPB encoded = enc.encodeRangePartitions(
+        rangePartitions, ImmutableList.of());
+
+    Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
+    List<CreateTableOptions.RangePartition> decoded =
+        dec.decodeRangePartitions(encoded, schema);
+    assertEquals(2, decoded.size());
+
+    assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
+        decoded.get(0).getLowerBoundType());
+    assertEquals(RangePartitionBound.EXCLUSIVE_BOUND,
+        decoded.get(0).getUpperBoundType());
+    {
+      final PartialRow lowerDecoded = decoded.get(0).getLowerBound();
+      final PartialRow upperDecoded = decoded.get(0).getUpperBound();
+
+      assertTrue(lowerDecoded.isSet("c0"));
+      assertEquals(0, lowerDecoded.getInt("c0"));
+      assertTrue(lowerDecoded.isSet("c1"));
+      assertEquals("a", lowerDecoded.getString("c1"));
+      assertFalse(lowerDecoded.isSet("c2"));
+      assertFalse(lowerDecoded.isSet("c3"));
+
+      assertTrue(upperDecoded.isSet("c0"));
+      assertEquals(100, upperDecoded.getInt("c0"));
+      assertTrue(upperDecoded.isSet("c1"));
+      assertEquals("c", upperDecoded.getString("c1"));
+      assertFalse(upperDecoded.isSet("c2"));
+      assertFalse(upperDecoded.isSet("c3"));
+    }
+
+
+    assertEquals(RangePartitionBound.EXCLUSIVE_BOUND,
+        decoded.get(1).getLowerBoundType());
+    assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
+        decoded.get(1).getUpperBoundType());
+    {
+      final PartialRow lowerDecoded = decoded.get(1).getLowerBound();
+      final PartialRow upperDecoded = decoded.get(1).getUpperBound();
+
+      assertTrue(lowerDecoded.isSet("c0"));
+      assertEquals(200, lowerDecoded.getInt("c0"));
+      assertTrue(lowerDecoded.isSet("c1"));
+      assertEquals("e", lowerDecoded.getString("c1"));
+      assertFalse(lowerDecoded.isSet("c2"));
+      assertFalse(lowerDecoded.isSet("c3"));
+
+      assertTrue(upperDecoded.isSet("c0"));
+      assertEquals(300, upperDecoded.getInt("c0"));
+      assertTrue(upperDecoded.isSet("c1"));
+      assertEquals("f", upperDecoded.getString("c1"));
+      assertFalse(upperDecoded.isSet("c2"));
+      assertFalse(upperDecoded.isSet("c3"));
+    }
+  }
 }