You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/09/08 23:42:39 UTC

[1/2] kudu git commit: KUDU-1065: [java client] Flexible Partition Pruning

Repository: kudu
Updated Branches:
  refs/heads/master e33bac441 -> c7dab48eb


http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
new file mode 100644
index 0000000..b065cdb
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
@@ -0,0 +1,567 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestPartitionPruner extends BaseKuduTest {
+
+  /**
+   * Counts the partitions touched by a scan with optional primary key bounds.
+   * The table is assumed to have three INT8 columns as the primary key.
+   *
+   * @param table the table to scan
+   * @param partitions the partitions of the table
+   * @param lowerBoundPrimaryKey the optional lower bound primary key
+   * @param upperBoundPrimaryKey the optional upper bound primary key
+   * @return the number of partitions touched by the scan
+   */
+  private int countPartitionsPrimaryKey(KuduTable table,
+                                        List<Partition> partitions,
+                                        byte[] lowerBoundPrimaryKey,
+                                        byte[] upperBoundPrimaryKey) throws Exception {
+    KuduScanToken.KuduScanTokenBuilder scanBuilder = syncClient.newScanTokenBuilder(table);
+
+    if (lowerBoundPrimaryKey != null) {
+      PartialRow lower = table.getSchema().newPartialRow();
+      for (int i = 0; i < 3; i++) {
+        lower.addByte(i, lowerBoundPrimaryKey[i]);
+      }
+      scanBuilder.lowerBound(lower);
+    }
+
+    if (upperBoundPrimaryKey != null) {
+      PartialRow upper = table.getSchema().newPartialRow();
+      for (int i = 0; i < 3; i++) {
+        upper.addByte(i, upperBoundPrimaryKey[i]);
+      }
+      scanBuilder.exclusiveUpperBound(upper);
+    }
+
+    PartitionPruner pruner = PartitionPruner.create(scanBuilder);
+
+    int scannedPartitions = 0;
+    for (Partition partition : partitions) {
+      if (!pruner.shouldPrune(partition)) scannedPartitions++;
+    }
+
+    // Check that the number of ScanTokens built for the scan matches.
+    assertEquals(scannedPartitions, scanBuilder.build().size());
+    return scannedPartitions;
+  }
+
+  /**
+   * Counts the partitions touched by a scan with predicates.
+   *
+   * @param table the table to scan
+   * @param partitions the partitions of the table
+   * @param predicates the predicates to apply to the scan
+   * @return the number of partitions touched by the scan
+   */
+  private int countPartitions(KuduTable table,
+                              List<Partition> partitions,
+                              KuduPredicate... predicates) throws Exception {
+    KuduScanToken.KuduScanTokenBuilder scanBuilder = syncClient.newScanTokenBuilder(table);
+
+    for (KuduPredicate predicate : predicates) {
+      scanBuilder.addPredicate(predicate);
+    }
+
+    PartitionPruner pruner = PartitionPruner.create(scanBuilder);
+
+    int scannedPartitions = 0;
+    for (Partition partition : partitions) {
+      if (!pruner.shouldPrune(partition)) scannedPartitions++;
+    }
+
+    // Check that the number of ScanTokens built for the scan matches.
+    assertEquals(scannedPartitions, scanBuilder.build().size());
+    return scannedPartitions;
+  }
+
+  /**
+   * Counts the partitions touched by a scan with predicates and optional partition key bounds.
+   *
+   * @param table the table to scan
+   * @param partitions the partitions of the table
+   * @param lowerBoundPartitionKey an optional lower bound partition key
+   * @param upperBoundPartitionKey an optional upper bound partition key
+   * @param predicates the predicates to apply to the scan
+   * @return the number of partitions touched by the scan
+   */
+  private int countPartitions(KuduTable table,
+                              List<Partition> partitions,
+                              byte[] lowerBoundPartitionKey,
+                              byte[] upperBoundPartitionKey,
+                              KuduPredicate... predicates) throws Exception {
+    // Partition key bounds can't be applied to the ScanTokenBuilder.
+    KuduScanner.KuduScannerBuilder scanBuilder = syncClient.newScannerBuilder(table);
+
+    for (KuduPredicate predicate : predicates) {
+      scanBuilder.addPredicate(predicate);
+    }
+
+    if (lowerBoundPartitionKey != null) {
+      scanBuilder.lowerBoundPartitionKeyRaw(lowerBoundPartitionKey);
+    }
+    if (upperBoundPartitionKey != null) {
+      scanBuilder.exclusiveUpperBoundPartitionKeyRaw(upperBoundPartitionKey);
+    }
+
+    PartitionPruner pruner = PartitionPruner.create(scanBuilder);
+
+    int scannedPartitions = 0;
+    for (Partition partition : partitions) {
+      if (!pruner.shouldPrune(partition)) scannedPartitions++;
+    }
+
+    return scannedPartitions;
+  }
+
+  /**
+   * Retrieves the partitions of a table.
+   *
+   * @param table the table
+   * @return the partitions of the table
+   */
+  private List<Partition> getTablePartitions(KuduTable table) {
+    List<Partition> partitions = new ArrayList<>();
+    for (KuduScanToken token : syncClient.newScanTokenBuilder(table).build()) {
+      partitions.add(token.getTablet().getPartition());
+    }
+    return partitions;
+  }
+
+  @Test
+  public void testPrimaryKeyRangePruning() throws Exception {
+    // CREATE TABLE t
+    // (a INT8, b INT8, c INT8)
+    // PRIMARY KEY (a, b, c))
+    // PARTITION BY RANGE (a, b, c)
+    //    (PARTITION                 VALUES < (0, 0, 0),
+    //     PARTITION    (0, 0, 0) <= VALUES < (10, 10, 10)
+    //     PARTITION (10, 10, 10) <= VALUES);
+
+    ArrayList<ColumnSchema> columns = new ArrayList<>(3);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT8).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("b", Type.INT8).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c", Type.INT8).key(true).build());
+    Schema schema = new Schema(columns);
+
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(ImmutableList.of("a", "b", "c"));
+
+    PartialRow split = schema.newPartialRow();
+    split.addByte("a", (byte) 0);
+    split.addByte("b", (byte) 0);
+    split.addByte("c", (byte) 0);
+    tableBuilder.addSplitRow(split);
+    split.addByte("a", (byte) 10);
+    split.addByte("b", (byte) 10);
+    split.addByte("c", (byte) 10);
+    tableBuilder.addSplitRow(split);
+
+    String tableName = "testPrimaryKeyRangePruning-" + System.currentTimeMillis();
+
+    syncClient.createTable(tableName, schema, tableBuilder);
+    KuduTable table = syncClient.openTable(tableName);
+    List<Partition> partitions = getTablePartitions(table);
+
+    byte min = Byte.MIN_VALUE;
+
+    // No bounds
+    assertEquals(3, countPartitionsPrimaryKey(table, partitions, null, null));
+
+    // PK < (-1, min, min)
+    assertEquals(1, countPartitionsPrimaryKey(table, partitions, null,
+                                              new byte[] { -1, min, min }));
+
+    // PK < (10, 10, 10)
+    assertEquals(2, countPartitionsPrimaryKey(table, partitions, null, new byte[] { 10, 10, 10 }));
+
+    // PK < (100, min, min)
+    assertEquals(3, countPartitionsPrimaryKey(table, partitions, null, new byte[] { 100, min, min }));
+
+    // PK >= (-10, -10, -10)
+    assertEquals(3, countPartitionsPrimaryKey(table, partitions, new byte[] { -10, -10, -10 }, null));
+
+    // PK >= (0, 0, 0)
+    assertEquals(2, countPartitionsPrimaryKey(table, partitions, new byte[] { 0, 0, 0 }, null));
+
+    // PK >= (100, 0, 0)
+    assertEquals(1, countPartitionsPrimaryKey(table, partitions, new byte[] { 100, 0, 0 }, null));
+
+    // PK >= (-10, 0, 0)
+    // PK  < (100, 0, 0)
+    assertEquals(3, countPartitionsPrimaryKey(table, partitions,
+                                              new byte[] { -10, 0, 0 },
+                                              new byte[] { 100, 0, 0 }));
+
+    // PK >= (0, 0, 0)
+    // PK  < (10, 10, 10)
+    assertEquals(1, countPartitionsPrimaryKey(table, partitions,
+                                              new byte[] { 0, 0, 0 },
+                                              new byte[] { 10, 0, 0 }));
+
+    // PK >= (0, 0, 0)
+    // PK  < (10, 10, 11)
+    assertEquals(1, countPartitionsPrimaryKey(table, partitions,
+                                              new byte[] { 0, 0, 0 },
+                                              new byte[] { 10, 0, 0 }));
+
+    // PK < (0, 0, 0)
+    // PK >= (10, 10, 11)
+    assertEquals(0, countPartitionsPrimaryKey(table, partitions,
+                                              new byte[] { 10, 0, 0 },
+                                              new byte[] { 0, 0, 0 }));
+  }
+
+  @Test
+  public void testRangePartitionPruning() throws Exception {
+    // CREATE TABLE t
+    // (a INT8, b STRING, c INT8)
+    // PRIMARY KEY (a, b, c))
+    // DISTRIBUTE BY RANGE(c, b);
+    // PARTITION BY RANGE (a, b, c)
+    //    (PARTITION              VALUES < (0, "m"),
+    //     PARTITION  (0, "m") <= VALUES < (10, "r")
+    //     PARTITION (10, "r") <= VALUES);
+
+    ColumnSchema a = new ColumnSchema.ColumnSchemaBuilder("a", Type.INT8).key(true).build();
+    ColumnSchema b = new ColumnSchema.ColumnSchemaBuilder("b", Type.STRING).key(true).build();
+    ColumnSchema c = new ColumnSchema.ColumnSchemaBuilder("c", Type.INT8).key(true).build();
+    Schema schema = new Schema(ImmutableList.of(a, b, c));
+
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(ImmutableList.of("c", "b"));
+
+    PartialRow split = schema.newPartialRow();
+    split.addByte("c", (byte) 0);
+    split.addString("b", "m");
+    tableBuilder.addSplitRow(split);
+    split.addByte("c", (byte) 10);
+    split.addString("b", "r");
+    tableBuilder.addSplitRow(split);
+
+    String tableName = "testRangePartitionPruning-" + System.currentTimeMillis();
+    syncClient.createTable(tableName, schema, tableBuilder);
+    KuduTable table = syncClient.openTable(tableName);
+    List<Partition> partitions = getTablePartitions(table);
+
+    // No Predicates
+    assertEquals(3, countPartitions(table, partitions));
+
+    // c < -10
+    assertEquals(1, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.LESS, -10)));
+
+    // c = -10
+    assertEquals(1, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.EQUAL, -10)));
+
+    // c < 10
+    assertEquals(2, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.LESS, 10)));
+
+    // c < 100
+    assertEquals(3, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.LESS, 100)));
+
+
+    // c >= -10
+    assertEquals(3, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.GREATER_EQUAL, -10)));
+
+    // c >= 0
+    assertEquals(3, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.GREATER_EQUAL, -10)));
+
+    // c >= 5
+    assertEquals(2, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.GREATER_EQUAL, 5)));
+
+    // c >= 10
+    assertEquals(2, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.GREATER_EQUAL, 10)));
+
+    // c >= 100
+    assertEquals(1, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.GREATER_EQUAL, 100)));
+
+    // c >= -10
+    // c < 0
+    assertEquals(1, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.GREATER_EQUAL, -10),
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.LESS, 0)));
+
+    // c >= 5
+    // c < 100
+    assertEquals(2, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.GREATER_EQUAL, 5),
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.LESS, 100)));
+
+    // b = ""
+    assertEquals(3, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.EQUAL, "")));
+
+    // b >= "z"
+    assertEquals(3, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.GREATER_EQUAL, "z")));
+
+    // b < "a"
+    assertEquals(3, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.LESS, "a")));
+
+    // b >= "m"
+    // b < "z"
+    assertEquals(3, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.GREATER_EQUAL, "m"),
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.LESS, "z")));
+
+    // c >= 10
+    // b >= "r"
+    assertEquals(1, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.GREATER_EQUAL, 10),
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.GREATER_EQUAL, "r")));
+
+    // c >= 10
+    // b < "r"
+    assertEquals(2, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.GREATER_EQUAL, 10),
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.LESS, "r")));
+
+    // c = 10
+    // b < "r"
+    assertEquals(1, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.EQUAL, 10),
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.LESS, "r")));
+
+    // c < 0
+    // b < "m"
+    assertEquals(1, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.LESS, "m")));
+
+    // c < 0
+    // b < "z"
+    assertEquals(1, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.LESS, 0),
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.LESS, "z")));
+
+    // c = 0
+    // b = "m\0"
+    assertEquals(1, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.EQUAL, "m\0")));
+
+    // c = 0
+    // b < "m"
+    assertEquals(1, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.LESS, "m")));
+
+    // c = 0
+    // b < "m\0"
+    assertEquals(2, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.LESS, "m\0")));
+
+    // c = 0
+    // c = 2
+    assertEquals(0, countPartitions(table, partitions,
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.EQUAL, 2)));
+  }
+
+  @Test
+  public void testHashPartitionPruning() throws Exception {
+    // CREATE TABLE t
+    // (a INT8, b INT8, c INT8)
+    // PRIMARY KEY (a, b, c)
+    // PARTITION BY HASH (a) PARTITIONS 2,
+    //              HASH (b, c) PARTITIONS 2;
+
+    ColumnSchema a = new ColumnSchema.ColumnSchemaBuilder("a", Type.INT8).key(true).build();
+    ColumnSchema b = new ColumnSchema.ColumnSchemaBuilder("b", Type.INT8).key(true).build();
+    ColumnSchema c = new ColumnSchema.ColumnSchemaBuilder("c", Type.INT8).key(true).build();
+    Schema schema = new Schema(ImmutableList.of(a, b, c));
+
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(new ArrayList<String>());
+    tableBuilder.addHashPartitions(ImmutableList.of("a"), 2);
+    tableBuilder.addHashPartitions(ImmutableList.of("b", "c"), 2);
+
+    String tableName = "testHashPartitionPruning-" + System.currentTimeMillis();
+    syncClient.createTable(tableName, schema, tableBuilder);
+    KuduTable table = syncClient.openTable(tableName);
+    List<Partition> partitions = getTablePartitions(table);
+
+    // No Predicates
+    assertEquals(4, countPartitions(table, partitions));
+
+    // a = 0;
+    assertEquals(2, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(a, KuduPredicate.ComparisonOp.EQUAL, 0)));
+
+    // a >= 0;
+    assertEquals(4, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(a, KuduPredicate.ComparisonOp.GREATER_EQUAL, 0)));
+
+    // a >= 0;
+    // a < 1;
+    assertEquals(2, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(a, KuduPredicate.ComparisonOp.GREATER_EQUAL, 0),
+          KuduPredicate.newComparisonPredicate(a, KuduPredicate.ComparisonOp.LESS, 1)));
+
+    // a >= 0;
+    // a < 2;
+    assertEquals(4, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(a, KuduPredicate.ComparisonOp.GREATER_EQUAL, 0),
+          KuduPredicate.newComparisonPredicate(a, KuduPredicate.ComparisonOp.LESS, 2)));
+
+    // b = 1;
+    assertEquals(4, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.EQUAL, 1)));
+
+    // b = 1;
+    // c = 2;
+    assertEquals(2, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.EQUAL, 1),
+          KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.EQUAL, 2)));
+
+    // a = 0;
+    // b = 1;
+    // c = 2;
+    assertEquals(1, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(a, KuduPredicate.ComparisonOp.EQUAL, 0),
+          KuduPredicate.newComparisonPredicate(b, KuduPredicate.ComparisonOp.EQUAL, 1),
+          KuduPredicate.newComparisonPredicate(c, KuduPredicate.ComparisonOp.EQUAL, 2)));
+  }
+
+  @Test
+  public void testPruning() throws Exception {
+    // CREATE TABLE timeseries
+    // (host STRING, metric STRING, timestamp TIMESTAMP, value DOUBLE)
+    // PRIMARY KEY (host, metric, time)
+    // DISTRIBUTE BY
+    //    RANGE(time) SPLIT ROWS [(10)],
+    //        (PARTITION       VALUES < 10,
+    //         PARTITION 10 <= VALUES);
+    //    HASH (host, metric) 2 PARTITIONS;
+
+    ColumnSchema host = new ColumnSchema.ColumnSchemaBuilder("host", Type.STRING).key(true).build();
+    ColumnSchema metric = new ColumnSchema.ColumnSchemaBuilder("metric", Type.STRING).key(true).build();
+    ColumnSchema timestamp = new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.TIMESTAMP).key(true).build();
+    ColumnSchema value = new ColumnSchema.ColumnSchemaBuilder("value", Type.DOUBLE).build();
+    Schema schema = new Schema(ImmutableList.of(host, metric, timestamp, value));
+
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(ImmutableList.of("timestamp"));
+
+    PartialRow split = schema.newPartialRow();
+    split.addLong("timestamp", 10);
+    tableBuilder.addSplitRow(split);
+
+    tableBuilder.addHashPartitions(ImmutableList.of("host", "metric"), 2);
+
+    String tableName = "testPruning-" + System.currentTimeMillis();
+    syncClient.createTable(tableName, schema, tableBuilder);
+    KuduTable table = syncClient.openTable(tableName);
+    List<Partition> partitions = getTablePartitions(table);
+
+    // No Predicates
+    assertEquals(4, countPartitions(table, partitions));
+
+    // host = "a"
+    assertEquals(4, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(host, KuduPredicate.ComparisonOp.EQUAL, "a")));
+
+    // host = "a"
+    // metric = "a"
+    assertEquals(2, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(host, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(metric, KuduPredicate.ComparisonOp.EQUAL, "a")));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp >= 9;
+    assertEquals(2, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(host, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(metric, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(timestamp, KuduPredicate.ComparisonOp.GREATER_EQUAL, 9)));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp >= 10;
+    // timestamp < 20;
+    assertEquals(1, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(host, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(metric, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(timestamp, KuduPredicate.ComparisonOp.GREATER_EQUAL, 10),
+          KuduPredicate.newComparisonPredicate(timestamp, KuduPredicate.ComparisonOp.LESS, 20)));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp < 10;
+    assertEquals(1, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(host, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(metric, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(timestamp, KuduPredicate.ComparisonOp.LESS, 10)));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp >= 10;
+    assertEquals(1, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(host, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(metric, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(timestamp, KuduPredicate.ComparisonOp.GREATER_EQUAL, 10)));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp = 10;
+    assertEquals(1, countPartitions(table, partitions,
+          KuduPredicate.newComparisonPredicate(host, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(metric, KuduPredicate.ComparisonOp.EQUAL, "a"),
+          KuduPredicate.newComparisonPredicate(timestamp, KuduPredicate.ComparisonOp.EQUAL, 10)));
+
+    // partition key < (hash=1)
+    assertEquals(2, countPartitions(table, partitions, new byte[] {}, new byte[] { 0, 0, 0, 1 }));
+
+    // partition key >= (hash=1)
+    assertEquals(2, countPartitions(table, partitions, new byte[] { 0, 0, 0, 1 }, new byte[] {}));
+
+    // timestamp = 10
+    // partition key < (hash=1)
+    assertEquals(1, countPartitions(table, partitions, new byte[] {}, new byte[] { 0, 0, 0, 1 },
+          KuduPredicate.newComparisonPredicate(timestamp, KuduPredicate.ComparisonOp.EQUAL, 10)));
+
+    // timestamp = 10
+    // partition key >= (hash=1)
+    assertEquals(1, countPartitions(table, partitions, new byte[] { 0, 0, 0, 1 }, new byte[] {},
+          KuduPredicate.newComparisonPredicate(timestamp, KuduPredicate.ComparisonOp.EQUAL, 10)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/test/java/org/apache/kudu/util/TestByteVec.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/util/TestByteVec.java b/java/kudu-client/src/test/java/org/apache/kudu/util/TestByteVec.java
new file mode 100644
index 0000000..a1720ee
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/util/TestByteVec.java
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.util;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+public class TestByteVec {
+  private static final Random RAND = new Random();
+
+  private void assertBytesEqual(byte a, byte b) {
+    if (a != b) throw new AssertionError(String.format("%s != %s", a, b));
+  }
+
+  private List<Byte> random() {
+    return random(RAND.nextInt(1024));
+  }
+
+  private List<Byte> random(int len) {
+    List<Byte> list = new ArrayList<>();
+    for (int i = 0; i < len; i++) {
+      list.add((byte) RAND.nextInt(i + 1));
+    }
+    return Collections.unmodifiableList(list);
+  }
+
+  private void checkByteVec(List<Byte> vals) {
+    ByteVec vec = ByteVec.create();
+    assertEquals(0, vec.len());
+
+    // push
+    for (byte i : vals) {
+      vec.push(i);
+    }
+    assertEquals(vals, vec.asList());
+
+    // withCapacity
+    assertEquals(0, ByteVec.withCapacity(0).capacity());
+    assertEquals(13, ByteVec.withCapacity(13).capacity());
+
+    // wrap
+    assertEquals(vec, ByteVec.wrap(vec.toArray()));
+
+    // clone, equals
+    ByteVec copy = vec.clone();
+    assertEquals(copy, vec);
+
+    // truncate
+    copy.truncate(vec.len() + 1);
+    assertEquals(vals, copy.asList());
+    vec.truncate(copy.len());
+    assertEquals(vals, copy.asList());
+    copy.truncate(vals.size() / 2);
+    assertEquals(vals.subList(0, vals.size() / 2), copy.asList());
+    if (vals.size() > 0) {
+      assertNotEquals(vals, copy.asList());
+    }
+
+    // reserve
+    int unused = copy.capacity() - copy.len();
+
+    copy.reserve(unused);
+    assertEquals(vec.capacity(), copy.capacity());
+
+    copy.reserve(unused + 1);
+    assertTrue(copy.capacity() > vec.capacity());
+
+    // reserveExact
+    unused = copy.capacity() - copy.len();
+    copy.reserveExact(unused + 3);
+    assertEquals(copy.capacity() - copy.len(), unused + 3);
+
+    copy.truncate(0);
+    assertEquals(0, copy.len());
+
+    // shrinkToFit
+    copy.shrinkToFit();
+    assertEquals(0, copy.capacity());
+    vec.shrinkToFit();
+    assertEquals(vec.len(), vec.capacity());
+
+    // get
+    for (int i = 0; i < vals.size(); i++) {
+      assertBytesEqual(vals.get(i), vec.get(i));
+    }
+
+    // set
+    if (vec.len() > 0) {
+      copy = vec.clone();
+      int index = RAND.nextInt(vec.len());
+      copy.set(index, (byte) index);
+      List<Byte> intsCopy = new ArrayList<>(vals);
+      intsCopy.set(index, (byte) index);
+      assertEquals(intsCopy, copy.asList());
+    }
+  }
+
+  @Test
+  public void testByteVec() throws Exception {
+    checkByteVec(random(0));
+    checkByteVec(random(1));
+    checkByteVec(random(2));
+    checkByteVec(random(3));
+    checkByteVec(random(ByteVec.DEFAULT_CAPACITY - 2));
+    checkByteVec(random(ByteVec.DEFAULT_CAPACITY - 1));
+    checkByteVec(random(ByteVec.DEFAULT_CAPACITY));
+    checkByteVec(random(ByteVec.DEFAULT_CAPACITY + 1));
+    checkByteVec(random(ByteVec.DEFAULT_CAPACITY + 2));
+
+    for (int i = 0; i < 100; i++) {
+      checkByteVec(random());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/test/resources/flags
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/resources/flags b/java/kudu-client/src/test/resources/flags
index 34b73cb..687676f 100644
--- a/java/kudu-client/src/test/resources/flags
+++ b/java/kudu-client/src/test/resources/flags
@@ -1,4 +1,3 @@
---v=1
 --logtostderr
 --never_fsync
 --unlock_experimental_flags

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/src/kudu/common/partition_pruner-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc
index 72aecf9..9957ee0 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -44,10 +44,10 @@ using std::vector;
 namespace kudu {
 
 void CheckPrunedPartitions(const Schema& schema,
-                            const PartitionSchema& partition_schema,
-                            const vector<Partition> partitions,
-                            const ScanSpec& spec,
-                            size_t remaining_tablets) {
+                           const PartitionSchema& partition_schema,
+                           const vector<Partition> partitions,
+                           const ScanSpec& spec,
+                           size_t remaining_tablets) {
 
   PartitionPruner pruner;
   pruner.Init(schema, partition_schema, spec);
@@ -369,10 +369,6 @@ TEST(TestPartitionPruner, TestRangePruning) {
   // c < 0
   Check({ ColumnPredicate::Range(schema.column(2), &neg_ten, &zero) }, 1);
 
-  // c >= -10
-  // c < 0
-  Check({ ColumnPredicate::Range(schema.column(2), &neg_ten, &zero) }, 1);
-
   // c >= 5
   // c < 100
   Check({ ColumnPredicate::Range(schema.column(2), &five, &hundred) }, 2);
@@ -441,7 +437,7 @@ TEST(TestPartitionPruner, TestRangePruning) {
 
 TEST(TestPartitionPruner, TestHashPruning) {
   // CREATE TABLE t
-  // (a INT8, b STRING, c INT8)
+  // (a INT8, b INT8, c INT8)
   // PRIMARY KEY (a, b, c)
   // DISTRIBUTE BY HASH(a) INTO 2 BUCKETS,
   //               HASH(b, c) INTO 2 BUCKETS;
@@ -523,7 +519,7 @@ TEST(TestPartitionPruner, TestPruning) {
   // CREATE TABLE timeseries
   // (host STRING, metric STRING, time TIMESTAMP, value DOUBLE)
   // PRIMARY KEY (host, metric, time)
-  // DISTRIBUTE BY RANGE(time) SPLIT ROWS [(5), (10)],
+  // DISTRIBUTE BY RANGE(time) SPLIT ROWS [(10)],
   //               HASH(host, metric) INTO 2 BUCKETS;
   Schema schema({ ColumnSchema("host", STRING),
                   ColumnSchema("metric", STRING),
@@ -604,10 +600,10 @@ TEST(TestPartitionPruner, TestPruning) {
 
   // host = "a"
   // metric = "a"
-  // timestamp = 10;
+  // timestamp >= 10;
   Check({ ColumnPredicate::Equality(schema.column(0), &a),
           ColumnPredicate::Equality(schema.column(1), &a),
-          ColumnPredicate::Equality(schema.column(2), &ten) },
+          ColumnPredicate::Range(schema.column(2), &ten, nullptr) },
         "", "",
         1);
 
@@ -626,12 +622,12 @@ TEST(TestPartitionPruner, TestPruning) {
   // partition key >= (hash=1)
   Check({}, string("\0\0\0\1", 4), "", 2);
 
-  // a = 10
+  // timestamp = 10
   // partition key < (hash=1)
   Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
         "", string("\0\0\0\1", 4), 1);
 
-  // a = 10
+  // timestamp = 10
   // partition key >= (hash=1)
   Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
         string("\0\0\0\1", 4), "", 1);


[2/2] kudu git commit: KUDU-1065: [java client] Flexible Partition Pruning

Posted by ad...@apache.org.
KUDU-1065: [java client] Flexible Partition Pruning

This commit introduces an internal utility ByteVec class which is a
mashup of the C++ std::string / Rust Vec<u8> types. KeyEncoder has been
transitioned to use this type instead of ByteArrayOutputStream. The
partition pruning algorithm incrementally builds up partition keys from
predicates, and requires cloning the keys as they are being built in
order to multiply over hash partition buckets. ByteArrayOutputStream
doesn't have a clone method. ByteArrayOutputStream is also synchronized
internally, which is dumb. Thus begat ByteVec.

This version of partition pruning only looks at predicates when
determining which partitions to prune. Constraints in the primary key
bounds are not considered, unless the table is range partitioned over
the primary key columns and not hash partitioned (simple partitioning).
This limits the pruned partitions in some pretty rare cases, but the
workaround of explicitly setting the predicate is not too onerous.

Finally, this commit changes the default test flags to remove mini
cluster verbose logging, since it is extremely noisy.

Change-Id: Ib27b54841d87cf854175ab8cdfa8798b337d71f9
Reviewed-on: http://gerrit.cloudera.org:8080/4299
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c7dab48e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c7dab48e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c7dab48e

Branch: refs/heads/master
Commit: c7dab48ebc1dd05ff05a872ed7c3ab2a92f72a0d
Parents: e33bac4
Author: Dan Burkert <da...@cloudera.com>
Authored: Fri Aug 19 13:30:52 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Sep 8 23:42:17 2016 +0000

----------------------------------------------------------------------
 .../kudu/client/AbstractKuduScannerBuilder.java |   8 +-
 .../org/apache/kudu/client/AsyncKuduClient.java |   1 -
 .../apache/kudu/client/AsyncKuduScanner.java    |  96 +---
 .../java/org/apache/kudu/client/KeyEncoder.java | 169 +++---
 .../org/apache/kudu/client/KuduPredicate.java   |  14 +
 .../org/apache/kudu/client/KuduScanToken.java   |  25 +-
 .../org/apache/kudu/client/KuduScanner.java     |   3 +-
 .../java/org/apache/kudu/client/KuduTable.java  |   2 +-
 .../java/org/apache/kudu/client/PartialRow.java | 101 +++-
 .../org/apache/kudu/client/PartitionPruner.java | 475 ++++++++++++++++
 .../org/apache/kudu/client/PartitionSchema.java |   2 +-
 .../main/java/org/apache/kudu/util/ByteVec.java | 288 ++++++++++
 .../org/apache/kudu/client/TestKeyEncoding.java |  10 +-
 .../apache/kudu/client/TestPartitionPruner.java | 567 +++++++++++++++++++
 .../java/org/apache/kudu/util/TestByteVec.java  | 134 +++++
 java/kudu-client/src/test/resources/flags       |   1 -
 src/kudu/common/partition_pruner-test.cc        |  24 +-
 17 files changed, 1722 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index 9b648c3..17bc66d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -308,12 +308,12 @@ public abstract class AbstractKuduScannerBuilder
 
   /**
    * Set an encoded (inclusive) start partition key for the scan.
+   * Not for public consumption: use either predicates or primary key bounds instead.
    *
    * @param partitionKey the encoded partition key
    * @return this instance
    */
-  @InterfaceAudience.LimitedPrivate("Impala")
-  public S lowerBoundPartitionKeyRaw(byte[] partitionKey) {
+  S lowerBoundPartitionKeyRaw(byte[] partitionKey) {
     if (Bytes.memcmp(partitionKey, lowerBoundPartitionKey) > 0) {
       this.lowerBoundPartitionKey = partitionKey;
     }
@@ -322,12 +322,12 @@ public abstract class AbstractKuduScannerBuilder
 
   /**
    * Set an encoded (exclusive) end partition key for the scan.
+   * Not for public consumption: use either predicates or primary key bounds instead.
    *
    * @param partitionKey the encoded partition key
    * @return this instance
    */
-  @InterfaceAudience.LimitedPrivate("Impala")
-  public S exclusiveUpperBoundPartitionKeyRaw(byte[] partitionKey) {
+  S exclusiveUpperBoundPartitionKeyRaw(byte[] partitionKey) {
     if (upperBoundPartitionKey.length == 0 || Bytes.memcmp(partitionKey, upperBoundPartitionKey) < 0) {
       this.upperBoundPartitionKey = partitionKey;
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 70b06ed..c42bd2f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1229,7 +1229,6 @@ public class AsyncKuduClient implements AutoCloseable {
    *         get called back, if deadline is reached, the deferred result will get erred back
    * @return a deferred object that yields a list of the tablets in the table, which can be queried
    *         for metadata about each tablet
-   * @throws Exception MasterErrorException if the table doesn't exist
    */
   Deferred<List<LocatedTablet>> locateTable(final KuduTable table,
                                             final byte[] startPartitionKey,

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 20cf9f2..55a9fbb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -45,7 +45,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.kudu.tserver.Tserver.NewScanRequestPB;
@@ -60,8 +59,7 @@ import static org.apache.kudu.tserver.Tserver.TabletServerErrorPB;
  * used from a single thread at a time. It's rarely (if ever?) useful to
  * scan concurrently from a shared scanner using multiple threads. If you
  * want to optimize large table scans using extra parallelism, create a few
- * scanners and give each of them a partition of the table to scan. Or use
- * MapReduce.
+ * scanners through the {@link KuduScanToken} API. Or use MapReduce.
  * <p>
  * There's no method in this class to explicitly open the scanner. It will open
  * itself automatically when you start scanning by calling {@link #nextRows()}.
@@ -140,6 +138,8 @@ public final class AsyncKuduScanner {
   private final KuduTable table;
   private final Schema schema;
 
+  private final PartitionPruner pruner;
+
   /**
    * Map of column name to predicate.
    */
@@ -156,18 +156,6 @@ public final class AsyncKuduScanner {
   private final long limit;
 
   /**
-   * The start partition key of the next tablet to scan.
-   *
-   * Each time the scan exhausts a tablet, this is updated to that tablet's end partition key.
-   */
-  private byte[] nextPartitionKey;
-
-  /**
-   * The end partition key of the last tablet to scan.
-   */
-  private final byte[] endPartitionKey;
-
-  /**
    * Set in the builder. If it's not set by the user, it will default to EMPTY_ARRAY.
    * It is then reset to the new start primary key of each tablet we open a scanner on as the scan
    * moves from one tablet to the next.
@@ -228,16 +216,13 @@ public final class AsyncKuduScanner {
 
   final long scanRequestTimeout;
 
-  private static final AtomicBoolean PARTITION_PRUNE_WARN = new AtomicBoolean(true);
-
   AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames,
                    List<Integer> projectedIndexes, ReadMode readMode, Common.OrderMode orderMode,
                    long scanRequestTimeout,
                    Map<String, KuduPredicate> predicates, long limit,
                    boolean cacheBlocks, boolean prefetching,
                    byte[] startPrimaryKey, byte[] endPrimaryKey,
-                   byte[] startPartitionKey, byte[] endPartitionKey,
-                   long htTimestamp, int batchSizeBytes) {
+                   long htTimestamp, int batchSizeBytes, PartitionPruner pruner) {
     checkArgument(batchSizeBytes > 0, "Need a strictly positive number of bytes, " +
         "got %s", batchSizeBytes);
     checkArgument(limit > 0, "Need a strictly positive number for the limit, " +
@@ -255,6 +240,7 @@ public final class AsyncKuduScanner {
 
     this.client = client;
     this.table = table;
+    this.pruner = pruner;
     this.readMode = readMode;
     this.orderMode = orderMode;
     this.scanRequestTimeout = scanRequestTimeout;
@@ -267,46 +253,6 @@ public final class AsyncKuduScanner {
     this.htTimestamp = htTimestamp;
     this.batchSizeBytes = batchSizeBytes;
 
-    if (!table.getPartitionSchema().isSimpleRangePartitioning() &&
-        (startPrimaryKey.length != 0 ||
-         endPrimaryKey.length != 0) &&
-        PARTITION_PRUNE_WARN.getAndSet(false)) {
-      LOG.warn("Starting full table scan. " +
-               "In the future this scan may be automatically optimized with partition pruning.");
-    }
-
-    if (table.getPartitionSchema().isSimpleRangePartitioning()) {
-      // If the table is simple range partitioned, then the partition key space
-      // is isomorphic to the primary key space. We can potentially reduce the
-      // scan length by only scanning the intersection of the primary key range
-      // and the partition key range. This is a stop-gap until real partition
-      // pruning is in place that can work across any partitioning type.
-
-      if ((endPartitionKey.length != 0 && Bytes.memcmp(startPrimaryKey, endPartitionKey) >= 0) ||
-          (endPrimaryKey.length != 0 && Bytes.memcmp(startPartitionKey, endPrimaryKey) >= 0)) {
-        // The primary key range and the partition key range do not intersect;
-        // the scan will be empty.
-        this.nextPartitionKey = startPartitionKey;
-        this.endPartitionKey = endPartitionKey;
-      } else {
-        // Assign the scan's partition key range to the intersection of the
-        // primary key and partition key ranges.
-        if (Bytes.memcmp(startPartitionKey, startPrimaryKey) < 0) {
-          this.nextPartitionKey = startPrimaryKey;
-        } else {
-          this.nextPartitionKey = startPartitionKey;
-        }
-        if (endPrimaryKey.length != 0 && Bytes.memcmp(endPartitionKey, endPrimaryKey) > 0) {
-          this.endPartitionKey = endPrimaryKey;
-        } else {
-          this.endPartitionKey = endPartitionKey;
-        }
-      }
-    } else {
-      this.nextPartitionKey = startPartitionKey;
-      this.endPartitionKey = endPartitionKey;
-    }
-
     // Map the column names to actual columns in the table schema.
     // If the user set this to 'null', we scan all columns.
     if (projectedNames != null) {
@@ -327,17 +273,9 @@ public final class AsyncKuduScanner {
       this.schema = table.getSchema();
     }
 
-    // If any of the column predicates are of type None (the predicate is known
-    // to match no rows), then the scan can be short circuited without
-    // contacting any tablet servers.
-    boolean shortCircuit = false;
-    for (KuduPredicate predicate : this.predicates.values()) {
-      if (predicate.getType() == KuduPredicate.PredicateType.NONE) {
-        shortCircuit = true;
-        break;
-      }
-    }
-    if (shortCircuit) {
+    // If the partition pruner has pruned all partitions, then the scan can be
+    // short circuited without contacting any tablet servers.
+    if (!pruner.hasMorePartitionKeyRanges()) {
       LOG.debug("Short circuiting scan with predicates: {}", predicates.values());
       this.hasMore = false;
       this.closed = true;
@@ -454,17 +392,14 @@ public final class AsyncKuduScanner {
           invalidate();
           if (e instanceof NonCoveredRangeException) {
             NonCoveredRangeException ncre = (NonCoveredRangeException) e;
-            nextPartitionKey = ncre.getNonCoveredRangeEnd();
+            pruner.removePartitionKeyRange(ncre.getNonCoveredRangeEnd());
 
             // Stop scanning if the non-covered range is past the end partition key.
-            if (ncre.getNonCoveredRangeEnd().length == 0
-                || (endPartitionKey.length != 0
-                && Bytes.memcmp(endPartitionKey, ncre.getNonCoveredRangeEnd()) <= 0)) {
+            if (!pruner.hasMorePartitionKeyRanges()) {
               hasMore = false;
               closed = true; // the scanner is closed on the other side at this point
               return Deferred.fromResult(RowResultIterator.empty());
             }
-            nextPartitionKey = ncre.getNonCoveredRangeEnd();
             scannerId = null;
             sequenceId = 0;
             return nextRows();
@@ -548,10 +483,9 @@ public final class AsyncKuduScanner {
 
   void scanFinished() {
     Partition partition = tablet.getPartition();
+    pruner.removePartitionKeyRange(partition.getPartitionKeyEnd());
     // Stop scanning if we have scanned until or past the end partition key.
-    if (partition.isEndPartition()
-        || (this.endPartitionKey.length != 0
-            && Bytes.memcmp(this.endPartitionKey, partition.getPartitionKeyEnd()) <= 0)) {
+    if (!pruner.hasMorePartitionKeyRanges()) {
       hasMore = false;
       closed = true; // the scanner is closed on the other side at this point
       return;
@@ -560,7 +494,6 @@ public final class AsyncKuduScanner {
       LOG.debug("Done scanning tablet {} for partition {} with scanner id {}",
                 tablet.getTabletIdAsString(), tablet.getPartition(), Bytes.pretty(scannerId));
     }
-    nextPartitionKey = partition.getPartitionKeyEnd();
     scannerId = null;
     sequenceId = 0;
     invalidate();
@@ -859,7 +792,7 @@ public final class AsyncKuduScanner {
     @Override
     public byte[] partitionKey() {
       // This key is used to lookup where the request needs to go
-      return nextPartitionKey;
+      return pruner.nextPartitionKey();
     }
   }
 
@@ -885,8 +818,7 @@ public final class AsyncKuduScanner {
           client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
           scanRequestTimeout, predicates, limit, cacheBlocks,
           prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
-          lowerBoundPartitionKey, upperBoundPartitionKey,
-          htTimestamp, batchSizeBytes);
+          htTimestamp, batchSizeBytes, PartitionPruner.create(this));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
index 96e36ac..6f358a6 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
@@ -16,16 +16,18 @@
 // under the License.
 package org.apache.kudu.client;
 
+import com.google.common.primitives.Ints;
 import com.google.common.primitives.UnsignedLongs;
 import com.sangupta.murmur.Murmur2;
+
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
+import org.apache.kudu.util.ByteVec;
 import org.apache.kudu.util.Pair;
 
-import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
@@ -39,7 +41,9 @@ import java.util.List;
 @InterfaceAudience.Private
 class KeyEncoder {
 
-  private final ByteArrayOutputStream buf = new ByteArrayOutputStream();
+  /** Non-constructable utility class. */
+  private KeyEncoder() {
+  }
 
   /**
    * Encodes the primary key of the row.
@@ -47,15 +51,29 @@ class KeyEncoder {
    * @param row the row to encode
    * @return the encoded primary key of the row
    */
-  public byte[] encodePrimaryKey(final PartialRow row) {
-    buf.reset();
-
+  public static byte[] encodePrimaryKey(final PartialRow row) {
+    ByteVec buf = ByteVec.create();
     final Schema schema = row.getSchema();
     for (int columnIdx = 0; columnIdx < schema.getPrimaryKeyColumnCount(); columnIdx++) {
       final boolean isLast = columnIdx + 1 == schema.getPrimaryKeyColumnCount();
-      encodeColumn(row, columnIdx, isLast);
+      encodeColumn(row, columnIdx, isLast, buf);
     }
-    return extractByteArray();
+    return buf.toArray();
+  }
+
+  /**
+   * Returns the bucket of the row for the given hash bucket schema. All columns
+   * in the hash bucket schema must be set in the row.
+   *
+   * @param row the row containing hash schema columns
+   * @param hashSchema the hash schema
+   * @return the hash bucket of the row
+   */
+  public static int getHashBucket(PartialRow row, HashBucketSchema hashSchema) {
+    ByteVec buf = ByteVec.create();
+    encodeColumns(row, hashSchema.getColumnIds(), buf);
+    long hash = Murmur2.hash64(buf.data(), buf.len(), hashSchema.getSeed());
+    return (int) UnsignedLongs.remainder(hash, hashSchema.getNumBuckets());
   }
 
   /**
@@ -65,28 +83,29 @@ class KeyEncoder {
    * @param partitionSchema the partition schema describing the table's partitioning
    * @return an encoded partition key
    */
-  public byte[] encodePartitionKey(PartialRow row, PartitionSchema partitionSchema) {
-    buf.reset();
+  public static byte[] encodePartitionKey(PartialRow row, PartitionSchema partitionSchema) {
+    ByteVec buf = ByteVec.create();
     if (!partitionSchema.getHashBucketSchemas().isEmpty()) {
-      ByteBuffer bucketBuf = ByteBuffer.allocate(4 * partitionSchema.getHashBucketSchemas().size());
-      bucketBuf.order(ByteOrder.BIG_ENDIAN);
-
-      for (final HashBucketSchema hashBucketSchema : partitionSchema.getHashBucketSchemas()) {
-        encodeColumns(row, hashBucketSchema.getColumnIds());
-        byte[] encodedColumns = extractByteArray();
-        long hash = Murmur2.hash64(encodedColumns,
-                                   encodedColumns.length,
-                                   hashBucketSchema.getSeed());
-        int bucket = (int) UnsignedLongs.remainder(hash, hashBucketSchema.getNumBuckets());
-        bucketBuf.putInt(bucket);
+      for (final HashBucketSchema hashSchema : partitionSchema.getHashBucketSchemas()) {
+        encodeHashBucket(getHashBucket(row, hashSchema), buf);
       }
-
-      assert bucketBuf.arrayOffset() == 0;
-      buf.write(bucketBuf.array(), 0, bucketBuf.position());
     }
 
-    encodeColumns(row, partitionSchema.getRangeSchema().getColumns());
-    return extractByteArray();
+    encodeColumns(row, partitionSchema.getRangeSchema().getColumns(), buf);
+    return buf.toArray();
+  }
+
+  /**
+   * Encodes the provided row into a range partition key.
+   *
+   * @param row the row to encode
+   * @param rangeSchema the range partition schema
+   * @return the encoded range partition key
+   */
+  public static byte[] encodeRangePartitionKey(PartialRow row, PartitionSchema.RangeSchema rangeSchema) {
+    ByteVec buf = ByteVec.create();
+    encodeColumns(row, rangeSchema.getColumns(), buf);
+    return buf.toArray();
   }
 
   /**
@@ -94,20 +113,23 @@ class KeyEncoder {
    * @param row the row containing the columns to encode
    * @param columnIds the IDs of each column to encode
    */
-  private void encodeColumns(PartialRow row, List<Integer> columnIds) {
+  private static void encodeColumns(PartialRow row, List<Integer> columnIds, ByteVec buf) {
     for (int i = 0; i < columnIds.size(); i++) {
       boolean isLast = i + 1 == columnIds.size();
-      encodeColumn(row, row.getSchema().getColumnIndex(columnIds.get(i)), isLast);
+      encodeColumn(row, row.getSchema().getColumnIndex(columnIds.get(i)), isLast, buf);
     }
   }
 
   /**
-   * Encodes a single column of a row.
+   * Encodes a single column of a row into the output buffer.
    * @param row the row being encoded
    * @param columnIdx the column index of the column to encode
    * @param isLast whether the column is the last component of the key
    */
-  private void encodeColumn(PartialRow row, int columnIdx, boolean isLast) {
+  private static void encodeColumn(PartialRow row,
+                                   int columnIdx,
+                                   boolean isLast,
+                                   ByteVec buf) {
     final Schema schema = row.getSchema();
     final ColumnSchema column = schema.getColumnByIndex(columnIdx);
     if (!row.isSet(columnIdx)) {
@@ -117,82 +139,75 @@ class KeyEncoder {
     final Type type = column.getType();
 
     if (type == Type.STRING || type == Type.BINARY) {
-      addBinaryComponent(row.getVarLengthData().get(columnIdx), isLast);
+      encodeBinary(row.getVarLengthData().get(columnIdx), isLast, buf);
     } else {
-      addComponent(row.getRowAlloc(),
-                   schema.getColumnOffset(columnIdx),
-                   type.getSize(),
-                   type);
+      encodeSignedInt(row.getRowAlloc(),
+                      schema.getColumnOffset(columnIdx),
+                      type.getSize(),
+                      buf);
     }
   }
 
   /**
-   * Encodes a byte buffer into the key.
+   * Encodes a variable length binary value into the output buffer.
    * @param value the value to encode
    * @param isLast whether the value is the final component in the key
+   * @param buf the output buffer
    */
-  private void addBinaryComponent(ByteBuffer value, boolean isLast) {
+  private static void encodeBinary(ByteBuffer value, boolean isLast, ByteVec buf) {
     value.reset();
 
     // TODO find a way to not have to read byte-by-byte that doesn't require extra copies. This is
     // especially slow now that users can pass direct byte buffers.
     while (value.hasRemaining()) {
       byte currentByte = value.get();
-      buf.write(currentByte);
+      buf.push(currentByte);
       if (!isLast && currentByte == 0x00) {
         // If we're a middle component of a composite key, we need to add a \x00
         // at the end in order to separate this component from the next one. However,
         // if we just did that, we'd have issues where a key that actually has
         // \x00 in it would compare wrong, so we have to instead add \x00\x00, and
         // encode \x00 as \x00\x01. -- key_encoder.h
-        buf.write(0x01);
+        buf.push((byte) 0x01);
       }
     }
 
     if (!isLast) {
-      buf.write(0x00);
-      buf.write(0x00);
+      buf.push((byte) 0x00);
+      buf.push((byte) 0x00);
     }
   }
 
   /**
-   * Encodes a value of the given type into the key.
-   * @param value the value to encode
-   * @param offset the offset into the {@code value} buffer that the value begins
-   * @param len the length of the value
-   * @param type the type of the value to encode
+   * Encodes a signed integer into the output buffer
+   *
+   * @param value an array containing the little-endian encoded integer
+   * @param offset the offset of the value into the value array
+   * @param len the width of the value
+   * @param buf the output buffer
    */
-  private void addComponent(byte[] value, int offset, int len, Type type) {
-    switch (type) {
-      case INT8:
-      case INT16:
-      case INT32:
-      case INT64:
-      case TIMESTAMP:
-        // Picking the first byte because big endian.
-        byte lastByte = value[offset + (len - 1)];
-        lastByte = Bytes.xorLeftMostBit(lastByte);
-        buf.write(lastByte);
-        if (len > 1) {
-          for (int i = len - 2; i >= 0; i--) {
-            buf.write(value[offset + i]);
-          }
-        }
-        break;
-      default:
-        throw new IllegalArgumentException(String.format(
-            "The column type %s is not a valid key component type", type));
+  private static void encodeSignedInt(byte[] value,
+                                      int offset,
+                                      int len,
+                                      ByteVec buf) {
+    // Picking the first byte because big endian.
+    byte lastByte = value[offset + (len - 1)];
+    lastByte = Bytes.xorLeftMostBit(lastByte);
+    buf.push(lastByte);
+    if (len > 1) {
+      for (int i = len - 2; i >= 0; i--) {
+        buf.push(value[offset + i]);
+      }
     }
   }
 
   /**
-   * Returns the encoded key, and resets the key encoder to be used for another key.
-   * @return the encoded key which has been built through calls to {@link #addComponent}
+   * Encodes a hash bucket into the buffer.
+   * @param bucket the bucket
+   * @param buf the buffer
    */
-  private byte[] extractByteArray() {
-    byte[] bytes = buf.toByteArray();
-    buf.reset();
-    return bytes;
+  public static void encodeHashBucket(int bucket, ByteVec buf) {
+    buf.append(Ints.toByteArray(bucket));
   }
 
   /**
@@ -311,19 +326,19 @@ class KeyEncoder {
     // When encoding a binary column that is not the final column in the key, a
     // 0x0000 separator is used to retain lexicographic comparability. Null
     // bytes in the input are escaped as 0x0001.
-    ByteArrayOutputStream baos = new ByteArrayOutputStream(key.remaining());
+    ByteVec buf = ByteVec.withCapacity(key.remaining());
     for (int i = key.position(); i < key.limit(); i++) {
       if (key.get(i) == 0) {
         switch (key.get(i + 1)) {
           case 0: {
-            baos.write(key.array(),
+            buf.append(key.array(),
                        key.arrayOffset() + key.position(),
                        i - key.position());
             key.position(i + 2);
-            return baos.toByteArray();
+            return buf.toArray();
           }
           case 1: {
-            baos.write(key.array(),
+            buf.append(key.array(),
                        key.arrayOffset() + key.position(),
                        i + 1 - key.position());
             i++;
@@ -335,11 +350,11 @@ class KeyEncoder {
       }
     }
 
-    baos.write(key.array(),
+    buf.append(key.array(),
                key.arrayOffset() + key.position(),
                key.remaining());
     key.position(key.limit());
-    return baos.toByteArray();
+    return buf.toArray();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
index 02abc30..965a1e7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
@@ -567,6 +567,20 @@ public class KuduPredicate {
   }
 
   /**
+   * @return the encoded lower bound.
+   */
+  byte[] getLower() {
+    return lower;
+  }
+
+  /**
+   * @return the encoded upper bound.
+   */
+  byte[] getUpper() {
+    return upper;
+  }
+
+  /**
    * Returns the maximum value for the integer type.
    * @param type an integer type
    * @return the maximum value

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index a5f5081..ceee464 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -28,6 +28,7 @@ import org.apache.kudu.Common;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
 import org.apache.kudu.client.Client.ScanTokenPB;
+import org.apache.kudu.util.Pair;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -320,15 +321,23 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
       proto.setCacheBlocks(cacheBlocks);
 
       try {
-        List<LocatedTablet> tablets;
-        if (table.getPartitionSchema().isSimpleRangePartitioning()) {
-          // TODO: replace this with proper partition pruning.
-          tablets = table.getTabletsLocations(
-              lowerBoundPrimaryKey.length == 0 ? null : lowerBoundPrimaryKey,
-              upperBoundPrimaryKey.length == 0 ? null : upperBoundPrimaryKey,
+        PartitionPruner pruner = PartitionPruner.create(this);
+        List<LocatedTablet> tablets = new ArrayList<>();
+        while (pruner.hasMorePartitionKeyRanges()) {
+          Pair<byte[], byte[]> partitionRange = pruner.nextPartitionKeyRange();
+          List<LocatedTablet> newTablets = table.getTabletsLocations(
+              partitionRange.getFirst().length == 0 ? null : partitionRange.getFirst(),
+              partitionRange.getSecond().length == 0 ? null : partitionRange.getSecond(),
               timeout);
-        } else {
-          tablets = table.getTabletsLocations(timeout);
+
+          if (newTablets.isEmpty()) {
+            pruner.removePartitionKeyRange(partitionRange.getSecond());
+          } else {
+            pruner.removePartitionKeyRange(newTablets.get(newTablets.size() - 1)
+                                                     .getPartition()
+                                                     .getPartitionKeyEnd());
+          }
+          tablets.addAll(newTablets);
         }
 
         List<KuduScanToken> tokens = new ArrayList<>(tablets.size());

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 558f404..4a4bcc1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -142,8 +142,7 @@ public class KuduScanner {
           client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
           scanRequestTimeout, predicates, limit, cacheBlocks,
           prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
-          lowerBoundPartitionKey, upperBoundPartitionKey,
-          htTimestamp, batchSizeBytes));
+          htTimestamp, batchSizeBytes, PartitionPruner.create(this)));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
index 1265f80..d8f40f1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
@@ -179,7 +179,7 @@ public class KuduTable {
   @Deprecated
   public List<LocatedTablet> getTabletsLocations(byte[] startKey,
                                                  byte[] endKey,
-                                                 long deadline) throws Exception{
+                                                 long deadline) throws Exception {
     return client.syncLocateTable(this, startKey, endKey, deadline);
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
index 31a7596..a59bf9f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
@@ -29,6 +29,7 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
+import org.apache.kudu.util.ByteVec;
 
 /**
  * Class used to represent parts of a row along with its schema.<p>
@@ -511,7 +512,7 @@ public class PartialRow {
    * @return a byte array containing an encoded primary key
    */
   public byte[] encodePrimaryKey() {
-    return new KeyEncoder().encodePrimaryKey(this);
+    return KeyEncoder.encodePrimaryKey(this);
   }
 
   /**
@@ -624,6 +625,104 @@ public class PartialRow {
   }
 
   /**
+   * Sets the column to the provided raw value.
+   * @param index the index of the column to set
+   * @param value the raw value
+   */
+  void setRaw(int index, byte[] value) {
+    Type type = schema.getColumnByIndex(index).getType();
+    switch (type) {
+      case BOOL:
+      case INT8:
+      case INT16:
+      case INT32:
+      case INT64:
+      case TIMESTAMP:
+      case FLOAT:
+      case DOUBLE: {
+        Preconditions.checkArgument(value.length == type.getSize());
+        System.arraycopy(value, 0, rowAlloc, getPositionInRowAllocAndSetBitSet(index), value.length);
+        break;
+      }
+      case STRING:
+      case BINARY: {
+        addVarLengthData(index, value);
+        break;
+      }
+    }
+  }
+
+  /**
+   * Increments the column at the given index, returning {@code false} if the
+   * value is already the maximum.
+   *
+   * @param index the column index to increment
+   * @return {@code true} if the column is successfully incremented, or {@code false} if
+   *         it is already the maximum value
+   */
+  boolean incrementColumn(int index) {
+    Type type = schema.getColumnByIndex(index).getType();
+    Preconditions.checkState(isSet(index));
+    int offset = getPositionInRowAllocAndSetBitSet(index);
+    switch (type) {
+      case BOOL: {
+        boolean isFalse = rowAlloc[offset] == 0;
+        rowAlloc[offset] = 1;
+        return isFalse;
+      }
+      case INT8:{
+        byte existing = rowAlloc[offset];
+        if (existing == Byte.MAX_VALUE) return false;
+        rowAlloc[offset] = (byte) (existing + 1);
+        return true;
+      }
+      case INT16: {
+        short existing = Bytes.getShort(rowAlloc, offset);
+        if (existing == Short.MAX_VALUE) return false;
+        Bytes.setShort(rowAlloc, (short) (existing + 1), offset);
+        return true;
+      }
+      case INT32: {
+        int existing = Bytes.getInt(rowAlloc, offset);
+        if (existing == Integer.MAX_VALUE) return false;
+        Bytes.setInt(rowAlloc, existing + 1, offset);
+        return true;
+      }
+      case INT64:
+      case TIMESTAMP: {
+        long existing = Bytes.getLong(rowAlloc, offset);
+        if (existing == Long.MAX_VALUE) return false;
+        Bytes.setLong(rowAlloc, existing + 1, offset);
+        return true;
+      }
+      case FLOAT: {
+        float existing = Bytes.getFloat(rowAlloc, offset);
+        float incremented = Math.nextAfter(existing, Float.POSITIVE_INFINITY);
+        if (existing == incremented) return false;
+        Bytes.setFloat(rowAlloc, incremented, offset);
+        return true;
+      }
+      case DOUBLE: {
+        double existing = Bytes.getFloat(rowAlloc, offset);
+        double incremented = Math.nextAfter(existing, Double.POSITIVE_INFINITY);
+        if (existing == incremented) return false;
+        Bytes.setDouble(rowAlloc, incremented, offset);
+        return true;
+      }
+      case STRING:
+      case BINARY: {
+        ByteBuffer data = varLengthData.get(index);
+        int len = data.limit() - data.position();
+        byte[] incremented = new byte[len + 1];
+        System.arraycopy(data.array(), data.arrayOffset() + data.position(), incremented, 0, len);
+        addVarLengthData(index, incremented);
+        return true;
+      }
+    }
+    throw new RuntimeException("unreachable");
+  }
+
+  /**
    * Get the schema used for this row.
    * @return a schema that came from KuduTable
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
new file mode 100644
index 0000000..4545767
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.util.ByteVec;
+import org.apache.kudu.util.Pair;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.NotThreadSafe;
+
+@InterfaceAudience.Private
+@NotThreadSafe
+public class PartitionPruner {
+
+  private final Deque<Pair<byte[], byte[]>> rangePartitions;
+
+  /**
+   * Constructs a new partition pruner.
+   * @param rangePartitions the valid partition key ranges, in reverse sorted order
+   */
+  private PartitionPruner(Deque<Pair<byte[], byte[]>> rangePartitions) {
+    this.rangePartitions = rangePartitions;
+  }
+
+  /**
+   * @return a partition pruner that will prune all partitions
+   */
+  private static PartitionPruner empty() {
+    return new PartitionPruner(new ArrayDeque<Pair<byte[], byte[]>>());
+  }
+
+  /**
+   * Creates a new partition pruner for the provided scan.
+   * @param scanner the scan to prune
+   * @return a partition pruner
+   */
+  public static PartitionPruner create(AbstractKuduScannerBuilder<?, ?> scanner) {
+    Schema schema = scanner.table.getSchema();
+    PartitionSchema partitionSchema = scanner.table.getPartitionSchema();
+    PartitionSchema.RangeSchema rangeSchema = partitionSchema.getRangeSchema();
+    Map<String, KuduPredicate> predicates = scanner.predicates;
+
+    // Check if the scan can be short circuited entirely by checking the primary
+    // key bounds and predicates. This also allows us to assume some invariants of the
+    // scan, such as no None predicates and that the lower bound PK < upper
+    // bound PK.
+    if (scanner.upperBoundPrimaryKey.length > 0 &&
+        Bytes.memcmp(scanner.lowerBoundPrimaryKey, scanner.upperBoundPrimaryKey) >= 0) {
+      return PartitionPruner.empty();
+    }
+    for (KuduPredicate predicate : predicates.values()) {
+      if (predicate.getType() == KuduPredicate.PredicateType.NONE) {
+        return PartitionPruner.empty();
+      }
+    }
+
+    // Build a set of partition key ranges which cover the tablets necessary for
+    // the scan.
+    //
+    // Example predicate sets and resulting partition key ranges, based on the
+    // following tablet schema:
+    //
+    // CREATE TABLE t (a INT32, b INT32, c INT32) PRIMARY KEY (a, b, c)
+    // DISTRIBUTE BY RANGE (c)
+    //               HASH (a) INTO 2 BUCKETS
+    //               HASH (b) INTO 3 BUCKETS;
+    //
+    // Assume that hash(0) = 0 and hash(2) = 2.
+    //
+    // | Predicates | Partition Key Ranges                                   |
+    // +------------+--------------------------------------------------------+
+    // | a = 0      | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
+    // | b = 2      |                                                        |
+    // | c = 0      |                                                        |
+    // +------------+--------------------------------------------------------+
+    // | a = 0      | [(bucket=0, bucket=2), (bucket=0, bucket=3))           |
+    // | b = 2      |                                                        |
+    // +------------+--------------------------------------------------------+
+    // | a = 0      | [(bucket=0, bucket=0, c=0), (bucket=0, bucket=0, c=1)) |
+    // | c = 0      | [(bucket=0, bucket=1, c=0), (bucket=0, bucket=1, c=1)) |
+    // |            | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
+    // +------------+--------------------------------------------------------+
+    // | b = 2      | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
+    // | c = 0      | [(bucket=1, bucket=2, c=0), (bucket=1, bucket=2, c=1)) |
+    // +------------+--------------------------------------------------------+
+    // | a = 0      | [(bucket=0), (bucket=1))                               |
+    // +------------+--------------------------------------------------------+
+    // | b = 2      | [(bucket=0, bucket=2), (bucket=0, bucket=3))           |
+    // |            | [(bucket=1, bucket=2), (bucket=1, bucket=3))           |
+    // +------------+--------------------------------------------------------+
+    // | c = 0      | [(bucket=0, bucket=0, c=0), (bucket=0, bucket=0, c=1)) |
+    // |            | [(bucket=0, bucket=1, c=0), (bucket=0, bucket=1, c=1)) |
+    // |            | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
+    // |            | [(bucket=1, bucket=0, c=0), (bucket=1, bucket=0, c=1)) |
+    // |            | [(bucket=1, bucket=1, c=0), (bucket=1, bucket=1, c=1)) |
+    // |            | [(bucket=1, bucket=2, c=0), (bucket=1, bucket=2, c=1)) |
+    // +------------+--------------------------------------------------------+
+    // | None       | [(), ())                                               |
+    //
+    // If the partition key is considered as a sequence of the hash bucket
+    // components and a range component, then a few patterns emerge from the
+    // examples above:
+    //
+    // 1) The partition keys are truncated after the final constrained component
+    //    (hash bucket components are constrained when the scan is limited to a
+    //    single bucket via equality predicates on that component, while range
+    //    components are constrained if they have an upper or lower bound via
+    //    range or equality predicates on that component).
+    //
+    // 2) If the final constrained component is a hash bucket, then the
+    //    corresponding bucket in the upper bound is incremented in order to make
+    //    it an exclusive key.
+    //
+    // 3) The number of partition key ranges in the result is equal to the product
+    //    of the number of buckets of each unconstrained hash component which come
+    //    before a final constrained component. If there are no unconstrained hash
+    //    components, then the number of partition key ranges is one.
+
+    // Step 1: Build the range portion of the partition key. If the range partition
+    // columns match the primary key columns, then we can substitute the primary
+    // key bounds, if they are tighter.
+    byte[] rangeLowerBound = pushPredicatesIntoLowerBoundRangeKey(schema, rangeSchema, predicates);
+    byte[] rangeUpperBound = pushPredicatesIntoUpperBoundRangeKey(schema, rangeSchema, predicates);
+    if (partitionSchema.isSimpleRangePartitioning()) {
+      if (Bytes.memcmp(rangeLowerBound, scanner.lowerBoundPrimaryKey) < 0) {
+        rangeLowerBound = scanner.lowerBoundPrimaryKey;
+      }
+      if (scanner.upperBoundPrimaryKey.length > 0 &&
+          (rangeUpperBound.length == 0 ||
+           Bytes.memcmp(rangeUpperBound, scanner.upperBoundPrimaryKey) > 0)) {
+        rangeUpperBound = scanner.upperBoundPrimaryKey;
+      }
+    }
+
+    // Step 2: Create the hash bucket portion of the partition key.
+
+    // The list of hash buckets per hash component, or null if the component is
+    // not constrained.
+    List<Integer> hashBuckets = new ArrayList<>(partitionSchema.getHashBucketSchemas().size());
+    for (PartitionSchema.HashBucketSchema hashSchema : partitionSchema.getHashBucketSchemas()) {
+      hashBuckets.add(pushPredicatesIntoHashBucket(schema, hashSchema, predicates));
+    }
+
+    // The index of the final constrained component in the partition key.
+    int constrainedIndex = 0;
+    if (rangeLowerBound.length > 0 || rangeUpperBound.length > 0) {
+      // The range component is constrained.
+      constrainedIndex = hashBuckets.size();
+    } else {
+      // Search the hash bucket constraints from right to left, looking for the
+      // first constrained component.
+      for (int i = hashBuckets.size(); i > 0; i--) {
+        if (hashBuckets.get(i - 1) != null) {
+          constrainedIndex = i;
+          break;
+        }
+      }
+    }
+
+    // Build up a set of partition key ranges out of the hash components.
+    //
+    // Each constrained hash component simply appends its bucket number to the
+    // partition key ranges (possibly incrementing the upper bound by one bucket
+    // number if this is the final constraint, see note 2 in the example above).
+    //
+    // Each unconstrained hash component results in creating a new partition key
+    // range for each bucket of the hash component.
+    List<Pair<ByteVec, ByteVec>> partitionKeyRanges = new ArrayList<>();
+    partitionKeyRanges.add(new Pair<>(ByteVec.create(), ByteVec.create()));
+
+    ByteBuffer bucketBuf = ByteBuffer.allocate(4);
+    bucketBuf.order(ByteOrder.BIG_ENDIAN);
+    for (int hashIdx = 0; hashIdx < constrainedIndex; hashIdx++) {
+      // This is the final partition key component if this is the final constrained
+      // bucket, and the range upper bound is empty. In this case we need to
+      // increment the bucket on the upper bound to convert from inclusive to
+      // exclusive.
+      boolean isLast = hashIdx + 1 == constrainedIndex && rangeUpperBound.length == 0;
+
+      if (hashBuckets.get(hashIdx) != null) {
+        // This hash component is constrained by equality predicates to a single
+        // hash bucket.
+        int bucket = hashBuckets.get(hashIdx);
+        int bucketUpper = isLast ? bucket + 1 : bucket;
+
+        for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
+          KeyEncoder.encodeHashBucket(bucket, partitionKeyRange.getFirst());
+          KeyEncoder.encodeHashBucket(bucketUpper, partitionKeyRange.getSecond());
+        }
+      } else {
+        PartitionSchema.HashBucketSchema hashSchema =
+            partitionSchema.getHashBucketSchemas().get(hashIdx);
+        // Add a partition key range for each possible hash bucket.
+        List<Pair<ByteVec, ByteVec>> newPartitionKeyRanges =
+            new ArrayList<>(partitionKeyRanges.size() * hashSchema.getNumBuckets());
+        for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
+          for (int bucket = 0; bucket < hashSchema.getNumBuckets(); bucket++) {
+            int bucketUpper = isLast ? bucket + 1 : bucket;
+            ByteVec lower = partitionKeyRange.getFirst().clone();
+            ByteVec upper = partitionKeyRange.getFirst().clone();
+            KeyEncoder.encodeHashBucket(bucket, lower);
+            KeyEncoder.encodeHashBucket(bucketUpper, upper);
+            newPartitionKeyRanges.add(new Pair<>(lower, upper));
+          }
+        }
+        partitionKeyRanges = newPartitionKeyRanges;
+      }
+    }
+
+    // Step 3: append the (possibly empty) range bounds to the partition key ranges.
+    for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
+      range.getFirst().append(rangeLowerBound);
+      range.getSecond().append(rangeUpperBound);
+    }
+
+    // Step 4: Filter ranges that fall outside the scan's upper and lower bound partition keys.
+    Deque<Pair<byte[], byte[]>> partitionKeyRangeBytes = new ArrayDeque<>(partitionKeyRanges.size());
+    for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
+      byte[] lower = range.getFirst().toArray();
+      byte[] upper = range.getSecond().toArray();
+
+      // Sanity check that the lower bound is less than the upper bound.
+      assert upper.length == 0 || Bytes.memcmp(lower, upper) < 0;
+
+      // Find the intersection of the ranges.
+      if (scanner.lowerBoundPartitionKey.length > 0 &&
+          (lower.length == 0 || Bytes.memcmp(lower, scanner.lowerBoundPartitionKey) < 0)) {
+        lower = scanner.lowerBoundPartitionKey;
+      }
+      if (scanner.upperBoundPartitionKey.length > 0 &&
+          (upper.length == 0 || Bytes.memcmp(upper, scanner.upperBoundPartitionKey) > 0)) {
+        upper = scanner.upperBoundPartitionKey;
+      }
+
+      // If the intersection is valid, then add it as a range partition.
+      if (upper.length == 0 || Bytes.memcmp(lower, upper) < 0) {
+        partitionKeyRangeBytes.add(new Pair<>(lower, upper));
+      }
+    }
+
+    return new PartitionPruner(partitionKeyRangeBytes);
+  }
+
+  /** @return {@code true} if there are more range partitions to scan. */
+  public boolean hasMorePartitionKeyRanges() {
+    return !rangePartitions.isEmpty();
+  }
+
+  /** @return the inclusive lower bound partition key of the next tablet to scan. */
+  public byte[] nextPartitionKey() {
+    return rangePartitions.getFirst().getFirst();
+  }
+
+  /** @return the next range partition key range to scan. */
+  public Pair<byte[], byte[]> nextPartitionKeyRange() {
+    return rangePartitions.getFirst();
+  }
+
+  /** Removes all partition key ranges through the provided exclusive upper bound. */
+  public void removePartitionKeyRange(byte[] upperBound) {
+    if (upperBound.length == 0) {
+      rangePartitions.clear();
+      return;
+    }
+
+    while (!rangePartitions.isEmpty()) {
+      Pair<byte[], byte[]> range = rangePartitions.getFirst();
+      if (Bytes.memcmp(upperBound, range.getFirst()) <= 0) break;
+      rangePartitions.removeFirst();
+      if (range.getSecond().length == 0 || Bytes.memcmp(upperBound, range.getSecond()) < 0) {
+        // The upper bound falls in the middle of this range, so add it back
+        // with the restricted bounds.
+        rangePartitions.addFirst(new Pair<>(upperBound, range.getSecond()));
+        break;
+      }
+    }
+  }
+
+  /**
+   * @param partition to prune
+   * @return {@code true} if the partition should be pruned
+   */
+  boolean shouldPrune(Partition partition) {
+    // The C++ version uses binary search to do this with fewer key comparisons,
+    // but the algorithm isn't easily translatable, so this just uses a linear
+    // search.
+    for (Pair<byte[], byte[]> range : rangePartitions) {
+
+      // Continue searching the list of ranges if the partition is greater than
+      // the current range.
+      if (range.getSecond().length > 0 &&
+          Bytes.memcmp(range.getSecond(), partition.getPartitionKeyStart()) <= 0) {
+        continue;
+      }
+
+      // If the current range is greater than the partitions,
+      // then the partition should be pruned.
+      return partition.getPartitionKeyEnd().length > 0 &&
+             Bytes.memcmp(partition.getPartitionKeyEnd(), range.getFirst()) <= 0;
+    }
+
+    // The partition is greater than all ranges.
+    return true;
+  }
+
+  private static List<Integer> idsToIndexes(Schema schema, List<Integer> ids) {
+    List<Integer> indexes = new ArrayList<>(ids.size());
+    for (int id : ids) {
+      indexes.add(schema.getColumnIndex(id));
+    }
+    return indexes;
+  }
+
+  private static boolean incrementKey(PartialRow row, List<Integer> keyIndexes) {
+    for (int i = keyIndexes.size() - 1; i >= 0; i--) {
+      if (row.incrementColumn(keyIndexes.get(i))) return true;
+    }
+    return false;
+  }
+
+  /**
+   * Translates column predicates into a lower bound range partition key.
+   * @param schema the table schema
+   * @param rangeSchema the range partition schema
+   * @param predicates the predicates
+   * @return a lower bound range partition key
+   */
+  private static byte[] pushPredicatesIntoLowerBoundRangeKey(Schema schema,
+                                                             PartitionSchema.RangeSchema rangeSchema,
+                                                             Map<String, KuduPredicate> predicates) {
+    PartialRow row = schema.newPartialRow();
+    int pushedPredicates = 0;
+
+    List<Integer> rangePartitionColumnIdxs = idsToIndexes(schema, rangeSchema.getColumns());
+
+    // Copy predicates into the row in range partition key column order,
+    // stopping after the first missing predicate.
+    for (int idx : rangePartitionColumnIdxs) {
+      ColumnSchema column = schema.getColumnByIndex(idx);
+      KuduPredicate predicate = predicates.get(column.getName());
+      if (predicate == null) break;
+
+      if (predicate.getType() != KuduPredicate.PredicateType.EQUALITY &&
+          predicate.getType() != KuduPredicate.PredicateType.RANGE) {
+        throw new IllegalArgumentException(
+            String.format("unexpected predicate type can not be pushed into key: %s", predicate));
+      }
+
+      byte[] value = predicate.getLower();
+      if (value == null) break;
+      row.setRaw(idx, value);
+      pushedPredicates++;
+    }
+
+    // If no predicates were pushed, no need to do any more work.
+    if (pushedPredicates == 0) return AsyncKuduClient.EMPTY_ARRAY;
+
+    // For each remaining column in the partition key, fill it with the minimum value.
+    Iterator<Integer> remainingIdxs = rangePartitionColumnIdxs.listIterator(pushedPredicates);
+    while (remainingIdxs.hasNext()) {
+      row.setMin(remainingIdxs.next());
+    }
+
+    return KeyEncoder.encodeRangePartitionKey(row, rangeSchema);
+  }
+
+  /**
+   * Translates column predicates into an upper bound range partition key.
+   * @param schema the table schema
+   * @param rangeSchema the range partition schema
+   * @param predicates the predicates
+   * @return an upper bound range partition key
+   */
+  private static byte[] pushPredicatesIntoUpperBoundRangeKey(Schema schema,
+                                                             PartitionSchema.RangeSchema rangeSchema,
+                                                             Map<String, KuduPredicate> predicates) {
+    PartialRow row = schema.newPartialRow();
+    int pushedPredicates = 0;
+    KuduPredicate finalPredicate = null;
+
+    List<Integer> rangePartitionColumnIdxs = idsToIndexes(schema, rangeSchema.getColumns());
+
+    // Step 1: copy predicates into the row in range partition key column order, stopping after
+    // the first missing predicate.
+    for (int idx : rangePartitionColumnIdxs) {
+      ColumnSchema column = schema.getColumnByIndex(idx);
+      KuduPredicate predicate = predicates.get(column.getName());
+      if (predicate == null) break;
+
+      if (predicate.getType() == KuduPredicate.PredicateType.EQUALITY) {
+        byte[] value = predicate.getLower();
+        row.setRaw(idx, value);
+        pushedPredicates++;
+        finalPredicate = predicate;
+      } else if (predicate.getType() == KuduPredicate.PredicateType.RANGE) {
+
+        if (predicate.getUpper() != null) {
+          row.setRaw(idx, predicate.getUpper());
+          pushedPredicates++;
+          finalPredicate = predicate;
+        }
+
+        // After the first column with a range constraint we stop pushing
+        // constraints into the upper bound. Instead, we push minimum values
+        // to the remaining columns (below), which is the maximally tight
+        // constraint.
+        break;
+      } else {
+        throw new IllegalArgumentException(
+            String.format("unexpected predicate type can not be pushed into key: %s", predicate));
+      }
+    }
+
+    // If no predicates were pushed, no need to do any more work.
+    if (pushedPredicates == 0) return AsyncKuduClient.EMPTY_ARRAY;
+
+    // Step 2: If the final predicate is an equality predicate, increment the
+    // key to convert it to an exclusive upper bound.
+    if (finalPredicate.getType() == KuduPredicate.PredicateType.EQUALITY) {
+      incrementKey(row, rangePartitionColumnIdxs.subList(0, pushedPredicates));
+    }
+
+    // Step 3: Fill the remaining columns without predicates with the min value.
+    Iterator<Integer> remainingIdxs = rangePartitionColumnIdxs.listIterator(pushedPredicates);
+    while (remainingIdxs.hasNext()) {
+      row.setMin(remainingIdxs.next());
+    }
+
+    return KeyEncoder.encodeRangePartitionKey(row, rangeSchema);
+  }
+
+  /**
+   * Determines if the provided predicates can constrain the hash component to a
+   * single bucket, and if so, returns the bucket number. Otherwise returns null.
+   */
+  private static Integer pushPredicatesIntoHashBucket(Schema schema,
+                                                      PartitionSchema.HashBucketSchema hashSchema,
+                                                      Map<String, KuduPredicate> predicates) {
+    List<Integer> columnIdxs = idsToIndexes(schema, hashSchema.getColumnIds());
+    PartialRow row = schema.newPartialRow();
+    for (int idx : columnIdxs) {
+      ColumnSchema column = schema.getColumnByIndex(idx);
+      KuduPredicate predicate = predicates.get(column.getName());
+      if (predicate == null || predicate.getType() != KuduPredicate.PredicateType.EQUALITY) {
+        return null;
+      }
+      row.setRaw(idx, predicate.getLower());
+    }
+    return KeyEncoder.getHashBucket(row, hashSchema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
index 19174ca..fa4ac4c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
@@ -79,7 +79,7 @@ public class PartitionSchema {
    * @return a byte array containing the encoded partition key of the row
    */
   public byte[] encodePartitionKey(PartialRow row) {
-    return new KeyEncoder().encodePartitionKey(row, this);
+    return KeyEncoder.encodePartitionKey(row, this);
   }
 
   public RangeSchema getRangeSchema() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/main/java/org/apache/kudu/util/ByteVec.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/ByteVec.java b/java/kudu-client/src/main/java/org/apache/kudu/util/ByteVec.java
new file mode 100644
index 0000000..8d50214
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/ByteVec.java
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.BaseEncoding;
+import com.google.common.primitives.Bytes;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+
+import java.util.Arrays;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * A vector of primitive bytes.
+ *
+ * The vector is backed by a contiguous array, and offers efficient random
+ * access.
+ */
+@InterfaceAudience.Private
+@NotThreadSafe
+public final class ByteVec {
+
+  /** Default initial capacity for new vectors. */
+  @VisibleForTesting
+  static final int DEFAULT_CAPACITY = 32;
+
+  /** data backing the vector. */
+  private byte[] data;
+
+  /** offset of first unused element in data. */
+  private int len;
+
+  private ByteVec(int capacity) {
+    data = new byte[capacity];
+    len = 0;
+  }
+
+  private ByteVec(byte[] data) {
+    this.data = data;
+    this.len = data.length;
+  }
+
+  /**
+   * Creates a new vector.
+   * @return the new vector.
+   */
+  public static ByteVec create() {
+    return new ByteVec(DEFAULT_CAPACITY);
+  }
+
+  /**
+   * Creates a new vector with the specified capacity.
+   * @param capacity the initial capacity of the vector
+   * @return a new vector with the specified capacity
+   */
+  public static ByteVec withCapacity(int capacity) {
+    return new ByteVec(capacity);
+  }
+
+  /**
+   * Wrap an existing array with a vector.
+   * The array should not be modified after this call.
+   * @param data the initial data for the vector
+   * @return a vector wrapping the data
+   */
+  public static ByteVec wrap(byte[] data) {
+    return new ByteVec(data);
+  }
+
+  /** Returns the number of elements the vector can hold without reallocating. */
+  public int capacity() {
+    return data.length;
+  }
+
+  /** Returns the primitive array backing the vector. The caller should not modify the array. */
+  public byte[] data() {
+    return data;
+  }
+
+  /** Returns the number of elements in the vector. */
+  public int len() {
+    return len;
+  }
+
+  /** Returns {@code true} if the vector is empty. */
+  public boolean isEmpty() {
+    return len == 0;
+  }
+
+  /**
+   * Reserves capacity for at least {@code additional} more elements to be
+   * inserted into the vector.
+   *
+   * The vector may reserve more space to avoid frequent reallocations. If the
+   * vector already has sufficient capacity, no reallocation will happen.
+   *
+   * @param additional capacity to reserve
+   */
+  public void reserve(int additional) {
+    Preconditions.checkArgument(additional >= 0, "negative additional");
+    if (data.length - len >= additional) return;
+    // Use a 1.5x growth factor. According to
+    // https://stackoverflow.com/questions/1100311/what-is-the-ideal-growth-rate-for-a-dynamically-allocated-array
+    // this is close to the ideal ratio, although it isn't clear if that holds
+    // for managed languages.
+    data = Arrays.copyOf(data, Math.max(len + additional,
+                                        data.length + data.length / 2));
+  }
+
+  /**
+   * Reserves capacity for exactly {@code additional} more elements to be
+   * inserted into the vector.
+   *
+   * If the vector already has sufficient capacity, no reallocation will happen.
+   *
+   * @param additional capacity to reserve
+   */
+  public void reserveExact(int additional) {
+    Preconditions.checkArgument(additional >= 0, "negative additional");
+    if (data.length - len >= additional) return;
+    data = Arrays.copyOf(data, len + additional);
+  }
+
+  /**
+   * Shrink the capacity of the vector to match the length.
+   */
+  public void shrinkToFit() {
+    if (len < data.length) data = Arrays.copyOf(data, len);
+  }
+
+  /**
+   * Shorten the vector to be {@code len} elements long.
+   * If {@code len} is greater than the vector's current length,
+   * this has no effect.
+   * @param len the new length of the vector
+   */
+  public void truncate(int len) {
+    Preconditions.checkArgument(len >= 0, "negative len");
+    this.len = Math.min(this.len, len);
+  }
+
+  /**
+   * Removes all elements from the vector.
+   * No reallocation will be performed.
+   */
+  public void clear() {
+    truncate(0);
+  }
+
+  /**
+   * Appends an element to the vector.
+   * @param element the element to append
+   */
+  public void push(byte element) {
+    reserve(1);
+    data[len++] = element;
+  }
+
+  /**
+   * Sets the element at {@code index} to the provided value.
+   * @param index of the element to set
+   * @param value to set the element to
+   * @throws IndexOutOfBoundsException if {@code index} is not valid
+   */
+  public void set(int index, byte value) {
+    if (index >= len) {
+      throw new IndexOutOfBoundsException(String.format("index: %s, len: %s", index, len));
+    }
+    data[index] = value;
+  }
+
+  /**
+   * Appends the bytes from another byte array to this vec.
+   * @param values the values to append
+   * @param offset the offset into {@code values} to append from
+   * @param len the number of bytes from {@code values} to append
+   */
+  public void append(byte[] values, int offset, int len) {
+    reserve(len);
+    System.arraycopy(values, offset, this.data, this.len, len);
+    this.len += len;
+  }
+
+  /**
+   * Appends all of the bytes from another byte array to this vec.
+   * @param values the values to append
+   */
+  public void append(byte[] values) {
+    append(values, 0, values.length);
+  }
+
+  /**
+   * Concatenates another vector onto the end of this one.
+   * @param other the other vector to concatenate onto this one
+   */
+  public void append(ByteVec other) {
+    append(other.data, 0, other.len);
+  }
+
+  /**
+   * Returns the element at the specified position.
+   * @param index of the element to return
+   * @return the element at the specified position
+   * @throws IndexOutOfBoundsException if the index is out of range
+   */
+  public byte get(int index) {
+    if (index >= len) {
+      throw new IndexOutOfBoundsException(String.format("index: %s, len: %s", index, len));
+    }
+    return data[index];
+  }
+
+  /**
+   * Returns a list view of the vector.
+   * The vector should not be concurrently modified while the list is in use.
+   * @return a list view of the vector
+   */
+  public List<Byte> asList() {
+    List<Byte> list = Bytes.asList(data);
+    if (len < data.length) return list.subList(0, len);
+    return list;
+  }
+
+  /**
+   * @return a copy of the vector as a byte[].
+   */
+  public byte[] toArray() {
+    return Arrays.copyOf(data, len);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    if (len == 0) {
+      return "[]";
+    }
+
+    StringBuilder builder = new StringBuilder(4 + len * 2);
+    builder.append("[0x");
+    builder.append(BaseEncoding.base16().encode(data, 0, len));
+    builder.append(']');
+    return builder.toString();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ByteVec other = (ByteVec) o;
+    if (len != other.len) return false;
+    for (int i = 0; i < len; i++) if (data[i] != other.data[i]) return false;
+    return true;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int hashCode() {
+    int result = len;
+    for (int i = 0; i < len; i++) result = 31 * result + data[i];
+    return result;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ByteVec clone() {
+    ByteVec clone = ByteVec.withCapacity(data.length);
+    clone.append(this);
+    return clone;
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7dab48e/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
index 37ddabc..57a630e 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
@@ -16,7 +16,6 @@
 // under the License.
 package org.apache.kudu.client;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -237,7 +236,6 @@ public class TestKeyEncoding {
 
   @Test
   public void testPartitionKeyEncoding() {
-    KeyEncoder encoder = new KeyEncoder();
     Schema schema = buildSchema(
         new ColumnSchemaBuilder("a", Type.INT32).key(true),
         new ColumnSchemaBuilder("b", Type.STRING).key(true),
@@ -254,7 +252,7 @@ public class TestKeyEncoding {
     rowA.addInt("a", 0);
     rowA.addString("b", "");
     rowA.addString("c", "");
-    assertBytesEquals(encoder.encodePartitionKey(rowA, partitionSchema),
+    assertBytesEquals(KeyEncoder.encodePartitionKey(rowA, partitionSchema),
                       new byte[]{
                           0, 0, 0, 0,           // hash(0, "")
                           0, 0, 0, 0x14,        // hash("")
@@ -266,7 +264,7 @@ public class TestKeyEncoding {
     rowB.addInt("a", 1);
     rowB.addString("b", "");
     rowB.addString("c", "");
-    assertBytesEquals(encoder.encodePartitionKey(rowB, partitionSchema),
+    assertBytesEquals(KeyEncoder.encodePartitionKey(rowB, partitionSchema),
                       new byte[]{
                           0, 0, 0, 0x5,         // hash(1, "")
                           0, 0, 0, 0x14,        // hash("")
@@ -278,7 +276,7 @@ public class TestKeyEncoding {
     rowC.addInt("a", 0);
     rowC.addString("b", "b");
     rowC.addString("c", "c");
-    assertBytesEquals(encoder.encodePartitionKey(rowC, partitionSchema),
+    assertBytesEquals(KeyEncoder.encodePartitionKey(rowC, partitionSchema),
                       new byte[]{
                           0, 0, 0, 0x1A,        // hash(0, "b")
                           0, 0, 0, 0x1D,        // hash("c")
@@ -291,7 +289,7 @@ public class TestKeyEncoding {
     rowD.addInt("a", 1);
     rowD.addString("b", "b");
     rowD.addString("c", "c");
-    assertBytesEquals(encoder.encodePartitionKey(rowD, partitionSchema),
+    assertBytesEquals(KeyEncoder.encodePartitionKey(rowD, partitionSchema),
                       new byte[]{
                           0, 0, 0, 0,           // hash(1, "b")
                           0, 0, 0, 0x1D,        // hash("c")