You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/04/18 20:12:08 UTC

[1/2] incubator-kudu git commit: KUDU-1312 - design doc for scan token API

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 22160e33c -> d50964eb4


KUDU-1312 - design doc for scan token API

Change-Id: Id208cecababf15e1671a01a219d4599adfcd4163
Reviewed-on: http://gerrit.cloudera.org:8080/2443
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/051e1325
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/051e1325
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/051e1325

Branch: refs/heads/master
Commit: 051e1325a99ae1e871e8a259b14d9080fbea25b4
Parents: 22160e3
Author: Dan Burkert <da...@cloudera.com>
Authored: Thu Mar 3 14:17:38 2016 -0800
Committer: Jean-Daniel Cryans <jd...@gerrit.cloudera.org>
Committed: Mon Apr 18 18:08:12 2016 +0000

----------------------------------------------------------------------
 docs/design-docs/README.md      |  2 +-
 docs/design-docs/scan-tokens.md | 83 ++++++++++++++++++++++++++++++++++++
 2 files changed, 84 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/051e1325/docs/design-docs/README.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/README.md b/docs/design-docs/README.md
index 5ecc549..24566e6 100644
--- a/docs/design-docs/README.md
+++ b/docs/design-docs/README.md
@@ -19,7 +19,6 @@ written from a point-of-time view, and do not necessarily represent the current
 state of the system. They are useful for learning why design decisions were
 made.
 
-
 | Document | Component(s) | Discussion |
 | -------- | ------------ | ---------- |
 | [Scan optimization and partition pruning](scan-optimization-partition-pruning.md) | Client, Tablet | [gerrit](http://gerrit.cloudera.org:8080/2149) |
@@ -35,3 +34,4 @@ made.
 | [Maintenance operation scheduling](triggering-maintenance-ops.md) | Master, Tablet Server | N/A |
 | [C++ client design and impl. details](cpp-client.md) | Client | N/A |
 | [(old) Heartbeating between tservers and multiple masters](old-multi-master-heartbeating.md) | Master | [gerrit](http://gerrit.cloudera.org:8080/2495) |
+| [Scan Token API](scan-tokens.md) | Client | [gerrit](http://gerrit.cloudera.org:8080/2443) |

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/051e1325/docs/design-docs/scan-tokens.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/scan-tokens.md b/docs/design-docs/scan-tokens.md
new file mode 100644
index 0000000..748b8e6
--- /dev/null
+++ b/docs/design-docs/scan-tokens.md
@@ -0,0 +1,83 @@
+<!---
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Scan Token API
+
+## Motivation
+
+Most distributed compute frameworks that integrate with Kudu need the ability to
+split Kudu tables into physical sections so that computation can be distributed
+and parallelized. Additionally, these frameworks often want to take advantage of
+data locality by executing tasks on or close to the physical machines that hold
+the data they are working on.
+
+Kudu should have a well defined API that all compute integrations can use to
+implement parallel scanning with locality hints. This API should be amenable to
+serialization so that individual scanner tasks may be shipped to remote task
+executors.
+
+## Design
+
+Kudu will provide a client API that takes a scan description (e.g. table name,
+projected columns, fault tolerance, snapshot timestamp, lower and upper primary
+key bounds, predicates, etc.) and returns a sequence of scan tokens. For
+example:
+
+```java
+ScanTokenBuilder builder = client.newScanTokenBuilder();
+builder.setProjectedColumnNames(ImmutableList.of("col1", "col2"));
+List<ScanToken> tokens = builder.build();
+```
+
+Scan tokens may be used to create a scanner over a single tablet. Additionally,
+scan tokens have a well defined, but opaque to the client, serialization format
+so that tokens may be serialized and deserialized by the compute framework, and
+even passed between processes using different Kudu client versions or
+implementations (JVM vs. C++). Continuing the previous example:
+
+```java
+byte[] serializedToken = tokens.get(0).serialize();
+
+// later, possibly in a different process
+
+KuduScanner scanner = ScanToken.deserializeIntoScanner(serializedToken, client);
+```
+
+Along with the serializable scan token, the API will provide a location hint
+containing the replicas hosting the data. This will be done via the existing
+replica location APIs (`org.kududb.client.LocatedTablet` in the Java client, and
+`std::vector<KuduTabletServer*>` in the C++ client).
+
+Initially, the scan token API should support creating a single token per tablet
+in the table (less tablets which may be pruned, if the client supports pruning,
+see the [partition pruning design doc](scan-optimization-partition-pruning.md)).
+Internally, limiting a token to a single tablet should be done by including
+partition key limits in the token, and setting those limits on the scanner when
+deserializing the token. Alternatively, the tablet ID could be directly included
+in the token, but this may have unintended consequences if tablet splitting
+features are added to Kudu. A token could be created before a split event, with
+the resulting scan happening after the split. By setting tablet limits through
+partition key bounds instead of tablet IDs, it is clear that the scanner should
+retrieve results from all of the child tablets.
+
+Eventually, the scan token API should allow applications to further split scan
+tokens so that inter-tablet parallelism can be acheived. Splitting tokens may be
+achieved by assigning the child tokens non-overlapping sections of the primary
+key range. Even without the token splitting feature built in to the API,
+applications can simulate the effect by building multiple sets of scan tokens
+using non-overlapping sets of primary key bounds. However, it is likely that in
+the future Kudu will be able to choose a more optimal primary key split point
+than the application, perhaps through an internal tablet statistics API.
+Additionally, having the API built in to the Kudu client further decreases the
+effort required to write high performance integrations for Kudu.


[2/2] incubator-kudu git commit: [java-client] implement scan token API

Posted by jd...@apache.org.
[java-client] implement scan token API

This is a first pass at implementing the
[scan token](http://gerrit.cloudera.org:8080/#/c/2443/) API for the Java client.
Scan tokens have been integrated into the Kudu MapReduce input format, but token
splitting has been left to a follow up commit.

Change-Id: I20eff9bf51e893226fc3bc47726565ca62c054e3
Reviewed-on: http://gerrit.cloudera.org:8080/2592
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Jean-Daniel Cryans


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/d50964eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/d50964eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/d50964eb

Branch: refs/heads/master
Commit: d50964eb4492c8d78504313f8b84af8ae501c4a2
Parents: 051e132
Author: Dan Burkert <da...@cloudera.com>
Authored: Mon Mar 7 12:45:56 2016 -0800
Committer: Jean-Daniel Cryans <jd...@gerrit.cloudera.org>
Committed: Mon Apr 18 18:11:34 2016 +0000

----------------------------------------------------------------------
 docs/release_notes.adoc                         |  18 ++
 .../client/AbstractKuduScannerBuilder.java      |   2 +
 .../java/org/kududb/client/AsyncKuduClient.java |   4 +-
 .../src/main/java/org/kududb/client/Bytes.java  |   2 +-
 .../main/java/org/kududb/client/KuduClient.java |  14 +-
 .../java/org/kududb/client/KuduPredicate.java   |   6 +-
 .../java/org/kududb/client/KuduScanToken.java   | 315 +++++++++++++++++++
 .../java/org/kududb/client/LocatedTablet.java   |   4 +-
 .../java/org/kududb/client/BaseKuduTest.java    |   4 +-
 .../java/org/kududb/client/TestKuduClient.java  |  79 +++++
 .../kududb/mapreduce/KuduTableInputFormat.java  | 208 ++++++------
 .../mapreduce/KuduTableMapReduceUtil.java       |  56 +++-
 .../kududb/mapreduce/TestInputFormatJob.java    |  20 +-
 .../mapreduce/TestKuduTableInputFormat.java     |   8 +-
 14 files changed, 593 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index e8690e4..017cb6c 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -53,6 +53,24 @@ By combining all of these properties, Kudu targets support for families of
 applications that are difficult or impossible to implement on current-generation
 Hadoop storage technologies.
 
+[[rn_0.9.0]]
+=== Release notes specific to 0.9.0
+
+[[rn_0.9.0_incompatible_changes]]
+==== Incompatible changes
+
+- The KuduTableInputFormat has changed how it handles scan predicates, including
+  how it serializes predicates to the job configuration object. The new
+  configuration key is "kudu.mapreduce.encoded.predicate". Clients using the
+  TableInputFormatConfigurator should not be affected.
+
+[[rn_0.9.0_new_features]]
+==== New features
+
+- link:https://issues.apache.org/jira/browse/KUDU-1306[KUDU-1306] Scan token API
+  for creating partition-aware scan descriptors. Can be used by clients and
+  query engines to more easily execute parallel scans.
+
 [[rn_0.8.0]]
 === Release notes specific to 0.8.0
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
index ae65aaf..0b1e60b 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
@@ -86,7 +86,9 @@ public abstract class AbstractKuduScannerBuilder
    * @param predicateBytes predicates to add
    * @return this instance
    * @throws IllegalArgumentException thrown when the passed bytes aren't valid
+   * @deprecated use {@link #addPredicate}
    */
+  @Deprecated
   public S addColumnRangePredicatesRaw(byte[] predicateBytes) {
     for (Tserver.ColumnRangePredicatePB pb : ColumnRangePredicate.fromByteArray(predicateBytes)) {
       addPredicate(ColumnRangePredicate.fromPb(pb).toKuduPredicate());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
index 51a6dd2..d7c5b4d 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
@@ -676,7 +676,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * <p>
    * Use {@code AsyncUtil.addCallbacksDeferring} to add this as the callback and
    * {@link AsyncKuduClient.RetryRpcErrback} as the "errback" to the {@code Deferred}
-   * returned by {@link #locateTablet(String, byte[])}.
+   * returned by {@link #locateTablet(KuduTable, byte[])}.
    * @param <R> RPC's return type.
    * @param <D> Previous query's return type, which we don't use, but need to specify in order to
    *           tie it all together.
@@ -700,7 +700,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * <p>
    * Use {@code AsyncUtil.addCallbacksDeferring} to add this as the "errback" and
    * {@link RetryRpcCB} as the callback to the {@code Deferred} returned by
-   * {@link #locateTablet(String, byte[])}.
+   * {@link #locateTablet(KuduTable, byte[])}.
    * @see #delayedSendRpcToTablet(KuduRpc, KuduException)
    * @param <R> The type of the original RPC.
    */

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Bytes.java b/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
index 624536e..2c6554d 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
@@ -1080,7 +1080,7 @@ public final class Bytes {
   }
 
   /**
-   * Utility methd to write a byte array to a data output. Equivalent of doing a writeInt of the
+   * Utility method to write a byte array to a data output. Equivalent of doing a writeInt of the
    * length followed by a write of the byte array. Convert back with {@link #readByteArray}
    * @param dataOutput
    * @param b

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/KuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/KuduClient.java
index a2003b6..6ca0939 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduClient.java
@@ -198,15 +198,25 @@ public class KuduClient implements AutoCloseable {
 
   /**
    * Creates a new {@link KuduScanner.KuduScannerBuilder} for a particular table.
-   * @param table the name of the table you intend to scan.
+   * @param table the table you intend to scan.
    * The string is assumed to use the platform's default charset.
-   * @return a new scanner builder for this table
+   * @return a new scanner builder for the table
    */
   public KuduScanner.KuduScannerBuilder newScannerBuilder(KuduTable table) {
     return new KuduScanner.KuduScannerBuilder(asyncClient, table);
   }
 
   /**
+   * Creates a new {@link KuduScanToken.KuduScanTokenBuilder} for a particular table.
+   * Used for integrations with compute frameworks.
+   * @param table the table you intend to scan
+   * @return a new scan token builder for the table
+   */
+  public KuduScanToken.KuduScanTokenBuilder newScanTokenBuilder(KuduTable table) {
+    return new KuduScanToken.KuduScanTokenBuilder(asyncClient, table);
+  }
+
+  /**
    * Analogous to {@link #shutdown()}.
    * @throws Exception if an error happens while closing the connections
    */

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/KuduPredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduPredicate.java b/java/kudu-client/src/main/java/org/kududb/client/KuduPredicate.java
index 32045ba..dea94c4 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduPredicate.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduPredicate.java
@@ -427,7 +427,8 @@ public class KuduPredicate {
    * Convert the predicate to the protobuf representation.
    * @return the protobuf message for this predicate.
    */
-  Common.ColumnPredicatePB toPB() {
+  @InterfaceAudience.Private
+  public Common.ColumnPredicatePB toPB() {
     Common.ColumnPredicatePB.Builder builder = Common.ColumnPredicatePB.newBuilder();
     builder.setColumn(column.getName());
 
@@ -462,7 +463,8 @@ public class KuduPredicate {
    * Convert a column predicate protobuf message into a predicate.
    * @return a predicate
    */
-  static KuduPredicate fromPB(Schema schema, Common.ColumnPredicatePB pb) {
+  @InterfaceAudience.Private
+  public static KuduPredicate fromPB(Schema schema, Common.ColumnPredicatePB pb) {
     ColumnSchema column = schema.getColumn(pb.getColumn());
     switch (pb.getPredicateCase()) {
       case EQUALITY: {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/KuduScanToken.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/kududb/client/KuduScanToken.java
new file mode 100644
index 0000000..382a4f0
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduScanToken.java
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.kududb.client;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.ZeroCopyLiteralByteString;
+import org.kududb.ColumnSchema;
+import org.kududb.Common;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.Client.ScanTokenPB;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A scan token describes a partial scan of a Kudu table limited to a single
+ * contiguous physical location. Using the {@link KuduScanTokenBuilder}, clients can
+ * describe the desired scan, including predicates, bounds, timestamps, and
+ * caching, and receive back a collection of scan tokens.
+ *
+ * Each scan token may be separately turned into a scanner using
+ * {@link #intoScanner}, with each scanner responsible for a disjoint section
+ * of the table.
+ *
+ * Scan tokens may be serialized using the {@link #serialize} method and
+ * deserialized back into a scanner using the {@link #deserializeIntoScanner}
+ * method. This allows use cases such as generating scan tokens in the planner
+ * component of a query engine, then sending the tokens to execution nodes based
+ * on locality, and then instantiating the scanners on those nodes.
+ *
+ * Scan token locality information can be inspected using the {@link #getTablet}
+ * method.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class KuduScanToken implements Comparable<KuduScanToken> {
+  private final LocatedTablet tablet;
+  private final ScanTokenPB message;
+
+  private KuduScanToken(LocatedTablet tablet, ScanTokenPB message) {
+    this.tablet = tablet;
+    this.message = message;
+  }
+
+  /**
+   * Returns the tablet which the scanner created from this token will access.
+   * @return the located tablet
+   */
+  public LocatedTablet getTablet() {
+    return tablet;
+  }
+
+  /**
+   * Creates a {@link KuduScanner} from this scan token.
+   * @param client a Kudu client for the cluster
+   * @return a scanner for the scan token
+   */
+  public KuduScanner intoScanner(KuduClient client) throws Exception {
+    return pbIntoScanner(message, client);
+  }
+
+  /**
+   * Serializes this {@code KuduScanToken} into a byte array.
+   * @return the serialized scan token
+   * @throws IOException
+   */
+  public byte[] serialize() throws IOException {
+    byte[] buf = new byte[message.getSerializedSize()];
+    CodedOutputStream cos = CodedOutputStream.newInstance(buf);
+    message.writeTo(cos);
+    cos.flush();
+    return buf;
+  }
+
+  /**
+   * Deserializes a {@code KuduScanToken} into a {@link KuduScanner}.
+   * @param buf a byte array containing the serialized scan token.
+   * @param client a Kudu client for the cluster
+   * @return a scanner for the serialized scan token
+   * @throws Exception
+   */
+  public static KuduScanner deserializeIntoScanner(byte[] buf, KuduClient client) throws Exception {
+    return pbIntoScanner(ScanTokenPB.parseFrom(CodedInputStream.newInstance(buf)), client);
+  }
+
+  private static KuduScanner pbIntoScanner(ScanTokenPB message,
+                                           KuduClient client) throws Exception {
+    Preconditions.checkArgument(
+        !message.getFeatureFlagsList().contains(ScanTokenPB.Feature.Unknown),
+        "Scan token requires an unsupported feature. This Kudu client must be updated.");
+
+    KuduTable table = client.openTable(message.getTableName());
+    KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
+
+    List<Integer> columns = new ArrayList<>(message.getProjectedColumnsCount());
+    for (Common.ColumnSchemaPB column : message.getProjectedColumnsList()) {
+      int columnIdx = table.getSchema().getColumnIndex(column.getName());
+      ColumnSchema schema = table.getSchema().getColumnByIndex(columnIdx);
+      Preconditions.checkArgument(column.getType() == schema.getType().getDataType(),
+                                  String.format("Column types do not match for column %s",
+                                                column.getName()));
+      columns.add(columnIdx);
+    }
+    builder.setProjectedColumnIndexes(columns);
+
+    for (Common.ColumnPredicatePB pred : message.getColumnPredicatesList()) {
+      builder.addPredicate(KuduPredicate.fromPB(table.getSchema(), pred));
+    }
+
+    if (message.hasLowerBoundPrimaryKey()) {
+      builder.lowerBoundRaw(message.getLowerBoundPrimaryKey().toByteArray());
+    }
+    if (message.hasUpperBoundPrimaryKey()) {
+      builder.exclusiveUpperBoundRaw(message.getUpperBoundPrimaryKey().toByteArray());
+    }
+
+    if (message.hasLowerBoundPartitionKey()) {
+      builder.lowerBoundPartitionKeyRaw(message.getLowerBoundPartitionKey().toByteArray());
+    }
+    if (message.hasUpperBoundPartitionKey()) {
+      builder.exclusiveUpperBoundPartitionKeyRaw(message.getUpperBoundPartitionKey().toByteArray());
+    }
+
+    if (message.hasLimit()) {
+      builder.limit(message.getLimit());
+    }
+
+    if (message.hasFaultTolerant()) {
+      // TODO(KUDU-1040)
+    }
+
+    if (message.hasReadMode()) {
+      switch (message.getReadMode()) {
+        case READ_AT_SNAPSHOT: {
+          builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT);
+          if (message.hasSnapTimestamp()) {
+            builder.snapshotTimestampRaw(message.getSnapTimestamp());
+          }
+          break;
+        }
+        case READ_LATEST: {
+          builder.readMode(AsyncKuduScanner.ReadMode.READ_LATEST);
+          break;
+        }
+        default: throw new IllegalArgumentException("unknown read mode");
+      }
+    }
+
+    if (message.hasPropagatedTimestamp()) {
+      // TODO (KUDU-1411)
+    }
+
+    if (message.hasCacheBlocks()) {
+      builder.cacheBlocks(message.getCacheBlocks());
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public int compareTo(KuduScanToken other) {
+    if (!message.getTableName().equals(other.message.getTableName())) {
+      throw new IllegalArgumentException("Scan tokens from different tables may not be compared");
+    }
+
+    return tablet.getPartition().compareTo(other.getTablet().getPartition());
+  }
+
+  /**
+   * Builds a sequence of scan tokens.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public static class KuduScanTokenBuilder
+      extends AbstractKuduScannerBuilder<KuduScanTokenBuilder, List<KuduScanToken>> {
+
+    private long timeout;
+
+    KuduScanTokenBuilder(AsyncKuduClient client, KuduTable table) {
+      super(client, table);
+      timeout = client.getDefaultOperationTimeoutMs();
+    }
+
+    /**
+     * Sets a timeout value to use when building the list of scan tokens. If
+     * unset, the client operation timeout will be used.
+     * @param timeoutMs the timeout in milliseconds.
+     */
+    public KuduScanTokenBuilder setTimeout(long timeoutMs) {
+      timeout = timeoutMs;
+      return this;
+    }
+
+    @Override
+    public List<KuduScanToken> build() {
+      if (lowerBoundPartitionKey != AsyncKuduClient.EMPTY_ARRAY ||
+          upperBoundPartitionKey != AsyncKuduClient.EMPTY_ARRAY) {
+        throw new IllegalArgumentException(
+            "Partition key bounds may not be set on KuduScanTokenBuilder");
+      }
+
+      // If the scan is short-circuitable, then return no tokens.
+      for (KuduPredicate predicate : predicates.values()) {
+        if (predicate.getType() == KuduPredicate.PredicateType.NONE) {
+          return ImmutableList.of();
+        }
+      }
+
+      Client.ScanTokenPB.Builder proto = Client.ScanTokenPB.newBuilder();
+
+      proto.setTableName(table.getName());
+
+      // Map the column names or indices to actual columns in the table schema.
+      // If the user did not set either projection, then scan all columns.
+      if (projectedColumnNames != null) {
+        for (String columnName : projectedColumnNames) {
+          ColumnSchema columnSchema = table.getSchema().getColumn(columnName);
+          Preconditions.checkArgument(columnSchema != null, "unknown column %s", columnName);
+          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), columnSchema);
+        }
+      } else if (projectedColumnIndexes != null) {
+        for (int columnIdx : projectedColumnIndexes) {
+          ColumnSchema columnSchema = table.getSchema().getColumnByIndex(columnIdx);
+          Preconditions.checkArgument(columnSchema != null, "unknown column index %s", columnIdx);
+          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), columnSchema);
+        }
+      } else {
+        for (ColumnSchema column : table.getSchema().getColumns()) {
+          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), column);
+        }
+      }
+
+      for (KuduPredicate predicate : predicates.values()) {
+        proto.addColumnPredicates(predicate.toPB());
+      }
+
+      if (lowerBoundPrimaryKey != AsyncKuduClient.EMPTY_ARRAY && lowerBoundPrimaryKey.length > 0) {
+        proto.setLowerBoundPrimaryKey(ZeroCopyLiteralByteString.copyFrom(lowerBoundPrimaryKey));
+      }
+      if (upperBoundPrimaryKey != AsyncKuduClient.EMPTY_ARRAY && upperBoundPrimaryKey.length > 0) {
+        proto.setUpperBoundPrimaryKey(ZeroCopyLiteralByteString.copyFrom(upperBoundPrimaryKey));
+      }
+      if (lowerBoundPartitionKey != AsyncKuduClient.EMPTY_ARRAY &&
+          lowerBoundPartitionKey.length > 0) {
+        proto.setLowerBoundPartitionKey(ZeroCopyLiteralByteString.copyFrom(lowerBoundPartitionKey));
+      }
+      if (upperBoundPartitionKey != AsyncKuduClient.EMPTY_ARRAY &&
+          upperBoundPartitionKey.length > 0) {
+        proto.setUpperBoundPartitionKey(ZeroCopyLiteralByteString.copyFrom(upperBoundPartitionKey));
+      }
+
+      proto.setLimit(limit);
+      proto.setReadMode(readMode.pbVersion());
+
+      // If the last propagated timestamp is set send it with the scan.
+      if (table.getAsyncClient().getLastPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
+        proto.setPropagatedTimestamp(client.getLastPropagatedTimestamp());
+      }
+
+      // If the mode is set to read on snapshot set the snapshot timestamp.
+      if (readMode == AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT &&
+          htTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+        proto.setSnapTimestamp(htTimestamp);
+      }
+
+      proto.setCacheBlocks(cacheBlocks);
+
+      try {
+        List<LocatedTablet> tablets;
+        if (table.getPartitionSchema().isSimpleRangePartitioning()) {
+          // TODO: replace this with proper partition pruning.
+          tablets = table.getTabletsLocations(
+              lowerBoundPrimaryKey.length == 0 ? null : lowerBoundPrimaryKey,
+              upperBoundPrimaryKey.length == 0 ? null : upperBoundPrimaryKey,
+              timeout);
+        } else {
+          tablets = table.getTabletsLocations(timeout);
+        }
+
+        List<KuduScanToken> tokens = new ArrayList<>(tablets.size());
+        for (LocatedTablet tablet : tablets) {
+          Client.ScanTokenPB.Builder builder = proto.clone();
+          builder.setLowerBoundPartitionKey(
+              ZeroCopyLiteralByteString.wrap(tablet.getPartition().partitionKeyStart));
+          builder.setUpperBoundPartitionKey(
+              ZeroCopyLiteralByteString.wrap(tablet.getPartition().partitionKeyEnd));
+          tokens.add(new KuduScanToken(tablet, builder.build()));
+        }
+        return tokens;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/LocatedTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/LocatedTablet.java b/java/kudu-client/src/main/java/org/kududb/client/LocatedTablet.java
index 28caa77..e25af9b 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/LocatedTablet.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/LocatedTablet.java
@@ -46,11 +46,11 @@ public class LocatedTablet {
     this.partition = ProtobufHelper.pbToPartition(pb.getPartition());
     this.tabletId = pb.getTabletId().toByteArray();
 
-    List<Replica> reps = Lists.newArrayList();
+    ImmutableList.Builder<Replica> reps = ImmutableList.builder();
     for (ReplicaPB repPb : pb.getReplicasList()) {
       reps.add(new Replica(repPb));
     }
-    this.replicas = ImmutableList.copyOf(reps);
+    this.replicas = reps.build();
   }
 
   public List<Replica> getReplicas() {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
index 7014630..996c746 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
@@ -104,7 +104,9 @@ public class BaseKuduTest {
         // shutting down the async client effectively does that.
       }
     } finally {
-      miniCluster.shutdown();
+      if (miniCluster != null) {
+        miniCluster.shutdown();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
index 7ff8a18..3893062 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
@@ -26,17 +26,24 @@ import static org.kududb.client.KuduPredicate.ComparisonOp.LESS;
 import static org.kududb.client.KuduPredicate.ComparisonOp.LESS_EQUAL;
 import static org.kududb.client.RowResult.timestampToString;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.kududb.ColumnSchema;
 import org.kududb.Schema;
 import org.kududb.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestKuduClient extends BaseKuduTest {
+  private static final Logger LOG = LoggerFactory.getLogger(TestKuduClient.class);
   private String tableName;
 
   @Before
@@ -309,6 +316,78 @@ public class TestKuduClient extends BaseKuduTest {
   }
 
   /**
+   * Tests scan tokens by creating a set of scan tokens, serializing them, and
+   * then executing them in parallel with separate client instances. This
+   * simulates the normal usecase of scan tokens being created at a central
+   * planner and distributed to remote task executors.
+   */
+  @Test
+  public void testScanTokens() throws Exception {
+    Schema schema = createManyStringsSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.addHashPartitions(ImmutableList.of("key"), 8);
+
+    PartialRow splitRow = schema.newPartialRow();
+    splitRow.addString("key", "key_50");
+    createOptions.addSplitRow(splitRow);
+
+    syncClient.createTable(tableName, schema, createOptions);
+
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+    KuduTable table = syncClient.openTable(tableName);
+    for (int i = 0; i < 100; i++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", String.format("key_%02d", i));
+      row.addString("c1", "c1_" + i);
+      row.addString("c2", "c2_" + i);
+      session.apply(insert);
+    }
+    session.flush();
+
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table);
+    tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+    List<KuduScanToken> tokens = tokenBuilder.build();
+    assertEquals(16, tokens.size());
+
+    final AtomicInteger count = new AtomicInteger(0);
+    List<Thread> threads = new ArrayList<>();
+    for (final KuduScanToken token : tokens) {
+      final byte[] serializedToken = token.serialize();
+      Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses)
+                                                  .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+                                                  .build()) {
+            KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
+            try {
+              int localCount = 0;
+              while (scanner.hasMoreRows()) {
+                localCount += Iterators.size(scanner.nextRows());
+              }
+              assertTrue(localCount > 0);
+              count.addAndGet(localCount);
+            } finally {
+              scanner.close();
+            }
+          } catch (Exception e) {
+            LOG.error("exception in parallel token scanner", e);
+          }
+        }
+      });
+      thread.run();
+      threads.add(thread);
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    assertEquals(100, count.get());
+  }
+
+  /**
    * Creates a local client that we auto-close while buffering one row, then makes sure that after
    * closing that we can read the row.
    */

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
index 4b6187c..25235cb 100644
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
+++ b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
@@ -17,28 +17,13 @@ package org.kududb.mapreduce;
 import com.google.common.base.Objects;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
-import org.apache.commons.net.util.Base64;
-import org.kududb.Schema;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.net.DNS;
+import com.google.common.primitives.UnsignedBytes;
 
-import javax.naming.NamingException;
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -46,6 +31,35 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.naming.NamingException;
+
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.net.DNS;
+import org.kududb.Common;
+import org.kududb.Schema;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.kududb.client.Bytes;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.KuduScanner;
+import org.kududb.client.KuduTable;
+import org.kududb.client.LocatedTablet;
+import org.kududb.client.RowResult;
+import org.kududb.client.RowResultIterator;
+import org.kududb.client.KuduScanToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * <p>
@@ -65,9 +79,7 @@ import java.util.Map;
 public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
     implements Configurable {
 
-  private static final Log LOG = LogFactory.getLog(KuduTableInputFormat.class);
-
-  private static final long SLEEP_TIME_FOR_RETRIES_MS = 1000;
+  private static final Logger LOG = LoggerFactory.getLogger(KuduTableInputFormat.class);
 
   /** Job parameter that specifies the input table. */
   static final String INPUT_TABLE_KEY = "kudu.mapreduce.input.table";
@@ -84,9 +96,9 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
   /** Job parameter that specifies the address for the name server. */
   static final String NAME_SERVER_KEY = "kudu.mapreduce.name.server";
 
-  /** Job parameter that specifies the encoded column range predicates (may be empty). */
-  static final String ENCODED_COLUMN_RANGE_PREDICATES_KEY =
-      "kudu.mapreduce.encoded.column.range.predicates";
+  /** Job parameter that specifies the encoded column predicates (may be empty). */
+  static final String ENCODED_PREDICATES_KEY =
+      "kudu.mapreduce.encoded.predicates";
 
   /**
    * Job parameter that specifies the column projection as a comma-separated list of column names.
@@ -102,7 +114,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
    * The reverse DNS lookup cache mapping: address from Kudu => hostname for Hadoop. This cache is
    * used in order to not do DNS lookups multiple times for each tablet server.
    */
-  private final Map<String, String> reverseDNSCacheMap = new HashMap<String, String>();
+  private final Map<String, String> reverseDNSCacheMap = new HashMap<>();
 
   private Configuration conf;
   private KuduClient client;
@@ -111,7 +123,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
   private String nameServer;
   private boolean cacheBlocks;
   private List<String> projectedCols;
-  private byte[] rawPredicates;
+  private List<KuduPredicate> predicates;
 
   @Override
   public List<InputSplit> getSplits(JobContext jobContext)
@@ -120,57 +132,25 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
       if (table == null) {
         throw new IOException("No table was provided");
       }
-      List<InputSplit> splits;
-      DeadlineTracker deadline = new DeadlineTracker();
-      deadline.setDeadline(operationTimeoutMs);
-      // If the job is started while a leader election is running, we might not be able to find a
-      // leader right away. We'll wait as long as the user is willing to wait with the operation
-      // timeout, and once we've waited long enough we just start picking the first replica we see
-      // for those tablets that don't have a leader. The client will later try to find the leader
-      // and it might fail, in which case the task will get retried.
-      retryloop:
-      while (true) {
-        List<LocatedTablet> locations;
-        try {
-          locations = table.getTabletsLocations(operationTimeoutMs);
-        } catch (Exception e) {
-          throw new IOException("Could not get the tablets locations", e);
-        }
 
-        if (locations.isEmpty()) {
-          throw new IOException("The requested table has 0 tablets, cannot continue");
-        }
+      KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table)
+                                                      .setProjectedColumnNames(projectedCols)
+                                                      .cacheBlocks(cacheBlocks)
+                                                      .setTimeout(operationTimeoutMs);
+      for (KuduPredicate predicate : predicates) {
+        tokenBuilder.addPredicate(predicate);
+      }
+      List<KuduScanToken> tokens = tokenBuilder.build();
 
-        // For the moment we only pass the leader since that's who we read from.
-        // If we've been trying to get a leader for each tablet for too long, we stop looping
-        // and just finish with what we have.
-        splits = new ArrayList<InputSplit>(locations.size());
-        for (LocatedTablet locatedTablet : locations) {
-          List<String> addresses = Lists.newArrayList();
-          LocatedTablet.Replica replica = locatedTablet.getLeaderReplica();
-          if (replica == null) {
-            if (deadline.wouldSleepingTimeout(SLEEP_TIME_FOR_RETRIES_MS)) {
-              LOG.debug("We ran out of retries, picking a non-leader replica for this tablet: " +
-                  locatedTablet.toString());
-              // We already checked it's not empty.
-              replica = locatedTablet.getReplicas().get(0);
-            } else {
-              LOG.debug("Retrying creating the splits because this tablet is missing a leader: " +
-                  locatedTablet.toString());
-              Thread.sleep(SLEEP_TIME_FOR_RETRIES_MS);
-              continue retryloop;
-            }
-          }
-          addresses.add(reverseDNS(replica.getRpcHost(), replica.getRpcPort()));
-          String[] addressesArray = addresses.toArray(new String[addresses.size()]);
-          Partition partition = locatedTablet.getPartition();
-          TableSplit split = new TableSplit(partition.getPartitionKeyStart(),
-                                            partition.getPartitionKeyEnd(),
-                                            addressesArray);
-          splits.add(split);
+      List<InputSplit> splits = new ArrayList<>(tokens.size());
+      for (KuduScanToken token : tokens) {
+        List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
+        for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
+          locations.add(reverseDNS(replica.getRpcHost(), replica.getRpcPort()));
         }
-        return splits;
+        splits.add(new TableSplit(token, locations.toArray(new String[locations.size()])));
       }
+      return splits;
     } finally {
       shutdownClient();
     }
@@ -226,13 +206,13 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
     String tableName = conf.get(INPUT_TABLE_KEY);
     String masterAddresses = conf.get(MASTER_ADDRESSES_KEY);
     this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY,
-        AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
+                                           AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
     this.nameServer = conf.get(NAME_SERVER_KEY);
     this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false);
 
     this.client = new KuduClient.KuduClientBuilder(masterAddresses)
-        .defaultOperationTimeoutMs(operationTimeoutMs)
-        .build();
+                                .defaultOperationTimeoutMs(operationTimeoutMs)
+                                .build();
     try {
       this.table = client.openTable(tableName);
     } catch (Exception ex) {
@@ -259,8 +239,17 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
       }
     }
 
-    String encodedPredicates = conf.get(ENCODED_COLUMN_RANGE_PREDICATES_KEY, "");
-    rawPredicates = Base64.decodeBase64(encodedPredicates);
+    this.predicates = new ArrayList<>();
+    try {
+      InputStream is =
+          new ByteArrayInputStream(Base64.decodeBase64(conf.get(ENCODED_PREDICATES_KEY, "")));
+      while (is.available() > 0) {
+        this.predicates.add(KuduPredicate.fromPB(table.getSchema(),
+                                                 Common.ColumnPredicatePB.parseDelimitedFrom(is)));
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("unable to deserialize predicates from the configuration", e);
+    }
   }
 
   /**
@@ -284,18 +273,31 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
 
   static class TableSplit extends InputSplit implements Writable, Comparable<TableSplit> {
 
-    private byte[] startPartitionKey;
-    private byte[] endPartitionKey;
+    /** The scan token that the split will use to scan the Kudu table. */
+    private byte[] scanToken;
+
+    /** The start partition key of the scan. Used for sorting splits. */
+    private byte[] partitionKey;
+
+    /** Tablet server locations which host the tablet to be scanned. */
     private String[] locations;
 
     public TableSplit() { } // Writable
 
-    public TableSplit(byte[] startPartitionKey, byte[] endPartitionKey, String[] locations) {
-      this.startPartitionKey = startPartitionKey;
-      this.endPartitionKey = endPartitionKey;
+    public TableSplit(KuduScanToken token, String[] locations) throws IOException {
+      this.scanToken = token.serialize();
+      this.partitionKey = token.getTablet().getPartition().getPartitionKeyStart();
       this.locations = locations;
     }
 
+    public byte[] getScanToken() {
+      return scanToken;
+    }
+
+    public byte[] getPartitionKey() {
+      return partitionKey;
+    }
+
     @Override
     public long getLength() throws IOException, InterruptedException {
       // TODO Guesstimate a size
@@ -307,23 +309,15 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
       return locations;
     }
 
-    public byte[] getStartPartitionKey() {
-      return startPartitionKey;
-    }
-
-    public byte[] getEndPartitionKey() {
-      return endPartitionKey;
-    }
-
     @Override
-    public int compareTo(TableSplit tableSplit) {
-      return Bytes.memcmp(startPartitionKey, tableSplit.getStartPartitionKey());
+    public int compareTo(TableSplit other) {
+      return UnsignedBytes.lexicographicalComparator().compare(partitionKey, other.partitionKey);
     }
 
     @Override
     public void write(DataOutput dataOutput) throws IOException {
-      Bytes.writeByteArray(dataOutput, startPartitionKey);
-      Bytes.writeByteArray(dataOutput, endPartitionKey);
+      Bytes.writeByteArray(dataOutput, scanToken);
+      Bytes.writeByteArray(dataOutput, partitionKey);
       dataOutput.writeInt(locations.length);
       for (String location : locations) {
         byte[] str = Bytes.fromString(location);
@@ -333,8 +327,8 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
 
     @Override
     public void readFields(DataInput dataInput) throws IOException {
-      startPartitionKey = Bytes.readByteArray(dataInput);
-      endPartitionKey = Bytes.readByteArray(dataInput);
+      scanToken = Bytes.readByteArray(dataInput);
+      partitionKey = Bytes.readByteArray(dataInput);
       locations = new String[dataInput.readInt()];
       for (int i = 0; i < locations.length; i++) {
         byte[] str = Bytes.readByteArray(dataInput);
@@ -344,8 +338,8 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
 
     @Override
     public int hashCode() {
-      // We currently just care about the row key since we're within the same table
-      return Arrays.hashCode(startPartitionKey);
+      // We currently just care about the partition key since we're within the same table.
+      return Arrays.hashCode(partitionKey);
     }
 
     @Override
@@ -361,8 +355,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
     @Override
     public String toString() {
       return Objects.toStringHelper(this)
-                    .add("startPartitionKey", Bytes.pretty(startPartitionKey))
-                    .add("endPartitionKey", Bytes.pretty(endPartitionKey))
+                    .add("partitionKey", Bytes.pretty(partitionKey))
                     .add("locations", Arrays.toString(locations))
                     .toString();
     }
@@ -383,13 +376,12 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
       }
 
       split = (TableSplit) inputSplit;
-      scanner = client.newScannerBuilder(table)
-          .setProjectedColumnNames(projectedCols)
-          .lowerBoundPartitionKeyRaw(split.getStartPartitionKey())
-          .exclusiveUpperBoundPartitionKeyRaw(split.getEndPartitionKey())
-          .cacheBlocks(cacheBlocks)
-          .addColumnRangePredicatesRaw(rawPredicates)
-          .build();
+
+      try {
+        scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), client);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
 
       // Calling this now to set iterator.
       tryRefreshIterator();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
index c8fa5e9..e9d0162 100644
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
+++ b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
@@ -14,6 +14,20 @@
  */
 package org.kududb.mapreduce;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.net.util.Base64;
@@ -29,16 +43,10 @@ import org.kududb.annotations.InterfaceAudience;
 import org.kududb.annotations.InterfaceStability;
 import org.kududb.client.AsyncKuduClient;
 import org.kududb.client.ColumnRangePredicate;
+import org.kududb.client.KuduPredicate;
 import org.kududb.client.KuduTable;
 import org.kududb.client.Operation;
 
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.*;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-
 /**
  * Utility class to setup MR jobs that use Kudu as an input and/or output.
  */
@@ -145,7 +153,7 @@ public class KuduTableMapReduceUtil {
     protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
     protected final String columnProjection;
     protected boolean cacheBlocks;
-    protected List<ColumnRangePredicate> columnRangePredicates = new ArrayList<>();
+    protected List<KuduPredicate> predicates = new ArrayList<>();
 
     /**
      * Constructor for the required fields to configure.
@@ -188,10 +196,7 @@ public class KuduTableMapReduceUtil {
         conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);
       }
 
-      if (!columnRangePredicates.isEmpty()) {
-        conf.set(KuduTableInputFormat.ENCODED_COLUMN_RANGE_PREDICATES_KEY,
-            base64EncodePredicates(columnRangePredicates));
-      }
+      conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, base64EncodePredicates(predicates));
 
       if (addDependencies) {
         addDependencyJars(job);
@@ -199,9 +204,17 @@ public class KuduTableMapReduceUtil {
     }
   }
 
-  static String base64EncodePredicates(List<ColumnRangePredicate> predicates) {
-    byte[] predicateBytes = ColumnRangePredicate.toByteArray(predicates);
-    return Base64.encodeBase64String(predicateBytes);
+  /**
+   * Returns the provided predicates as a Base64 encoded string.
+   * @param predicates the predicates to encode
+   * @return the encoded predicates
+   */
+  static String base64EncodePredicates(List<KuduPredicate> predicates) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    for (KuduPredicate predicate : predicates) {
+      predicate.toPB().writeDelimitedTo(baos);
+    }
+    return Base64.encodeBase64String(baos.toByteArray());
   }
 
 
@@ -297,9 +310,20 @@ public class KuduTableMapReduceUtil {
      * Adds a new predicate that will be pushed down to all the tablets.
      * @param predicate a predicate to add
      * @return this instance
+     * @deprecated use {@link #addPredicate}
      */
+    @Deprecated
     public TableInputFormatConfigurator addColumnRangePredicate(ColumnRangePredicate predicate) {
-      this.columnRangePredicates.add(predicate);
+      return addPredicate(predicate.toKuduPredicate());
+    }
+
+    /**
+     * Adds a new predicate that will be pushed down to all the tablets.
+     * @param predicate a predicate to add
+     * @return this instance
+     */
+    public TableInputFormatConfigurator addPredicate(KuduPredicate predicate) {
+      this.predicates.add(predicate);
       return this;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestInputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestInputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestInputFormatJob.java
index 9f416fc..de15645 100644
--- a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestInputFormatJob.java
+++ b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestInputFormatJob.java
@@ -17,7 +17,6 @@
 package org.kududb.mapreduce;
 
 import com.google.common.collect.Lists;
-import org.kududb.client.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
@@ -26,6 +25,9 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.RowResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,19 +71,19 @@ public class TestInputFormatJob extends BaseKuduTest {
     Configuration conf = new Configuration();
     HADOOP_UTIL.setupAndGetTestDir(TestInputFormatJob.class.getName(), conf).getAbsolutePath();
 
-    createAndTestJob(conf, new ArrayList<ColumnRangePredicate>(), 9);
+    createAndTestJob(conf, new ArrayList<KuduPredicate>(), 9);
 
-    ColumnRangePredicate pred1 = new ColumnRangePredicate(basicSchema.getColumnByIndex(0));
-    pred1.setLowerBound(20);
+    KuduPredicate pred1 = KuduPredicate.newComparisonPredicate(
+        basicSchema.getColumnByIndex(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 20);
     createAndTestJob(conf, Lists.newArrayList(pred1), 6);
 
-    ColumnRangePredicate pred2 = new ColumnRangePredicate(basicSchema.getColumnByIndex(2));
-    pred2.setUpperBound(1);
+    KuduPredicate pred2 = KuduPredicate.newComparisonPredicate(
+        basicSchema.getColumnByIndex(2), KuduPredicate.ComparisonOp.LESS_EQUAL, 1);
     createAndTestJob(conf, Lists.newArrayList(pred1, pred2), 2);
   }
 
   private void createAndTestJob(Configuration conf,
-                                List<ColumnRangePredicate> predicates, int expectedCount)
+                                List<KuduPredicate> predicates, int expectedCount)
       throws Exception {
     String jobName = TestInputFormatJob.class.getName();
     Job job = new Job(conf, jobName);
@@ -100,8 +102,8 @@ public class TestInputFormatJob extends BaseKuduTest {
             .operationTimeoutMs(DEFAULT_SLEEP)
             .addDependencies(false)
             .cacheBlocks(false);
-    for (ColumnRangePredicate predicate : predicates) {
-      configurator.addColumnRangePredicate(predicate);
+    for (KuduPredicate predicate : predicates) {
+      configurator.addPredicate(predicate);
     }
     configurator.configure();
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestKuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestKuduTableInputFormat.java b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestKuduTableInputFormat.java
index d7942c0..3a13825 100644
--- a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestKuduTableInputFormat.java
+++ b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestKuduTableInputFormat.java
@@ -100,14 +100,14 @@ public class TestKuduTableInputFormat extends BaseKuduTest {
     }
 
     // Test using a predicate that filters the row out.
-    ColumnRangePredicate pred1 = new ColumnRangePredicate(schema.getColumnByIndex(1));
-    pred1.setLowerBound(3);
+    KuduPredicate pred1 = KuduPredicate.newComparisonPredicate(
+        schema.getColumnByIndex(1), KuduPredicate.ComparisonOp.GREATER_EQUAL, 3);
     reader = createRecordReader("*", Lists.newArrayList(pred1));
     assertFalse(reader.nextKeyValue());
   }
 
   private RecordReader<NullWritable, RowResult> createRecordReader(String columnProjection,
-        List<ColumnRangePredicate> predicates) throws IOException, InterruptedException {
+        List<KuduPredicate> predicates) throws IOException, InterruptedException {
     KuduTableInputFormat input = new KuduTableInputFormat();
     Configuration conf = new Configuration();
     conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, getMasterAddresses());
@@ -117,7 +117,7 @@ public class TestKuduTableInputFormat extends BaseKuduTest {
     }
     if (predicates != null) {
       String encodedPredicates = KuduTableMapReduceUtil.base64EncodePredicates(predicates);
-      conf.set(KuduTableInputFormat.ENCODED_COLUMN_RANGE_PREDICATES_KEY, encodedPredicates);
+      conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, encodedPredicates);
     }
     input.setConf(conf);
     List<InputSplit> splits = input.getSplits(null);