You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/07 15:06:40 UTC

incubator-kudu git commit: KUDU-1309: [java client] support tables with non-covering partition-key ranges

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 3692233b8 -> a8a2e1983


KUDU-1309: [java client] support tables with non-covering partition-key ranges

This commit adds support for accessing and creating tables with non-covering
range partitions to the Java client. The only public interface introduced is the
CreateTableOptions.addRangeBound method. Otherwise, all the changes are
necessary for reading or writing to tables with range partition gaps. Right now
if a client attempts to write to a non-covered range, a NotFound status is
returned, which matches the C++ client. JD pointed out that this makes it
impossible for applications to distinguish between failed updates because the
row doesn't exist, and failed updates because the partition range doesn't exist.
I plan to fix this by introducing a new status code and fixing both clients in a
follow up commit.

Change-Id: Id65a1f8a95bb16fc0360a17021a391afd3c9d03f
Reviewed-on: http://gerrit.cloudera.org:8080/3388
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/a8a2e198
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/a8a2e198
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/a8a2e198

Branch: refs/heads/master
Commit: a8a2e1983392370358610c5ea237423efaa11c12
Parents: 3692233
Author: Dan Burkert <da...@cloudera.com>
Authored: Mon May 23 17:52:21 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed Jul 6 01:56:41 2016 +0000

----------------------------------------------------------------------
 .../java/org/kududb/client/AsyncKuduClient.java | 170 ++++++++++++++-----
 .../org/kududb/client/AsyncKuduScanner.java     |  78 ++++++---
 .../org/kududb/client/AsyncKuduSession.java     |  63 ++++++-
 .../src/main/java/org/kududb/client/Batch.java  |   2 +-
 .../java/org/kududb/client/BatchResponse.java   |   9 +
 .../src/main/java/org/kududb/client/Bytes.java  |  13 +-
 .../org/kududb/client/CreateTableOptions.java   |  39 ++++-
 .../org/kududb/client/CreateTableRequest.java   |  11 ++
 .../main/java/org/kududb/client/KuduRpc.java    |  15 +-
 .../java/org/kududb/client/KuduRpcResponse.java |  11 +-
 .../org/kududb/client/NonCoveredRangeCache.java | 104 ++++++++++++
 .../kududb/client/NonCoveredRangeException.java |  52 ++++++
 .../main/java/org/kududb/client/Operation.java  |  29 +++-
 .../org/kududb/client/OperationResponse.java    |   4 +-
 .../main/java/org/kududb/client/Partition.java  |   4 +-
 .../main/java/org/kududb/client/RowError.java   |  18 +-
 .../org/kududb/client/RowResultIterator.java    |   9 +
 .../java/org/kududb/client/BaseKuduTest.java    |  31 ++++
 .../kududb/client/TestFlexiblePartitioning.java |  74 +++++++-
 .../java/org/kududb/client/TestKuduClient.java  |  63 +++++++
 .../java/org/kududb/client/TestKuduSession.java | 121 ++++++++++++-
 .../java/org/kududb/client/TestKuduTable.java   |  57 ++++++-
 22 files changed, 839 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
index c51115b..52a5fdb 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
@@ -82,6 +82,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -169,6 +170,16 @@ public class AsyncKuduClient implements AutoCloseable {
       new ConcurrentHashMap<>();
 
   /**
+   * Map of table ID to non-covered range cache.
+   *
+   * TODO: Currently once a non-covered range is added to the cache, it is never
+   * removed. Once adding range partitions becomes possible entries will need to
+   * be expired.
+   */
+  private final ConcurrentMap<String, NonCoveredRangeCache> nonCoveredRangeCaches =
+      new ConcurrentHashMap<>();
+
+  /**
    * Cache that maps a TabletServer address ("ip:port") to the clients
    * connected to it.
    * <p>
@@ -566,27 +577,6 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Package-private access point for {@link AsyncKuduScanner}s to open themselves.
-   * @param scanner The scanner to open.
-   * @return A deferred {@link AsyncKuduScanner.Response}
-   */
-  Deferred<AsyncKuduScanner.Response> openScanner(final AsyncKuduScanner scanner) {
-    return sendRpcToTablet(scanner.getOpenRequest()).addErrback(
-        new Callback<Exception, Exception>() {
-          public Exception call(final Exception e) {
-            String message = "Cannot openScanner because: ";
-            LOG.warn(message, e);
-            // Don't let the scanner think it's opened on this tablet.
-            scanner.invalidate();
-            return e;  // Let the error propagate.
-          }
-          public String toString() {
-            return "openScanner errback";
-          }
-        });
-  }
-
-  /**
    * Create a new session for interacting with the cluster.
    * User is responsible for destroying the session object.
    * This is a fully local operation (no RPCs or blocking).
@@ -654,8 +644,7 @@ public class AsyncKuduClient implements AutoCloseable {
       // Oops, we couldn't find a tablet server that hosts this tablet. Our
       // cache was probably invalidated while the client was scanning. So
       // we can't close this scanner properly.
-      LOG.warn("Cannot close " + scanner + " properly, no connection open for "
-          + (tablet == null ? null : tablet));
+      LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet);
       return Deferred.fromResult(null);
     }
     final KuduRpc<AsyncKuduScanner.Response>  close_request = scanner.getCloseRequest();
@@ -682,11 +671,18 @@ public class AsyncKuduClient implements AutoCloseable {
     }
     request.attempt++;
     final String tableId = request.getTable().getTableId();
-    byte[] partitionKey = null;
-    if (request instanceof KuduRpc.HasKey) {
-       partitionKey = ((KuduRpc.HasKey)request).partitionKey();
+    byte[] partitionKey = request.partitionKey();
+    RemoteTablet tablet = getTablet(tableId, partitionKey);
+
+    if (tablet == null && partitionKey != null) {
+      // Check if the RPC is in a non-covered range.
+      Map.Entry<byte[], byte[]> nonCoveredRange = getNonCoveredRange(tableId, partitionKey);
+      if (nonCoveredRange != null) {
+        return Deferred.fromError(new NonCoveredRangeException(nonCoveredRange.getKey(),
+                                                               nonCoveredRange.getValue()));
+      }
+      // Otherwise fall through to below where a GetTableLocations lookup will occur.
     }
-    final RemoteTablet tablet = getTablet(tableId, partitionKey);
 
     // Set the propagated timestamp so that the next time we send a message to
     // the server the message includes the last propagated timestamp.
@@ -1069,7 +1065,7 @@ public class AsyncKuduClient implements AutoCloseable {
     } else {
       d = sendRpcToTablet(rpc);
     }
-    d.addCallback(new MasterLookupCB(table));
+    d.addCallback(new MasterLookupCB(table, partitionKey));
     if (has_permit) {
       d.addBoth(new ReleaseMasterLookupPermit<Master.GetTableLocationsResponsePB>());
     }
@@ -1082,8 +1078,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return An initialized Deferred object to hold the response.
    */
   Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB() {
-    final Deferred<Master.GetTableLocationsResponsePB> responseD =
-        new Deferred<Master.GetTableLocationsResponsePB>();
+    final Deferred<Master.GetTableLocationsResponsePB> responseD = new Deferred<>();
     final GetMasterRegistrationReceived received =
         new GetMasterRegistrationReceived(masterAddresses, responseD);
     for (HostAndPort hostAndPort : masterAddresses) {
@@ -1140,6 +1135,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // The next partition key to look up. If null, then it represents
     // the minimum partition key, If empty, it represents the maximum key.
     byte[] partitionKey = startPartitionKey;
+    String tableId = table.getTableId();
 
     // Continue while the partition key is the minimum, or it is not the maximum
     // and it is less than the end partition key.
@@ -1147,13 +1143,19 @@ public class AsyncKuduClient implements AutoCloseable {
            (partitionKey.length > 0 &&
             (endPartitionKey == null || Bytes.memcmp(partitionKey, endPartitionKey) < 0))) {
       byte[] key = partitionKey == null ? EMPTY_ARRAY : partitionKey;
-      RemoteTablet tablet = getTablet(table.getTableId(), key);
+      RemoteTablet tablet = getTablet(tableId, key);
       if (tablet != null) {
         ret.add(new LocatedTablet(tablet));
         partitionKey = tablet.getPartition().getPartitionKeyEnd();
         continue;
       }
 
+      Map.Entry<byte[], byte[]> nonCoveredRange = getNonCoveredRange(tableId, key);
+      if (nonCoveredRange != null) {
+        partitionKey = nonCoveredRange.getValue();
+        continue;
+      }
+
       if (deadlineTracker.timedOut()) {
         return Deferred.fromError(new NonRecoverableException(
             "Took too long getting the list of tablets, " + deadlineTracker));
@@ -1259,8 +1261,10 @@ public class AsyncKuduClient implements AutoCloseable {
   private final class MasterLookupCB implements Callback<Object,
       Master.GetTableLocationsResponsePB> {
     final KuduTable table;
-    MasterLookupCB(KuduTable table) {
+    private final byte[] partitionKey;
+    MasterLookupCB(KuduTable table, byte[] partitionKey) {
       this.table = table;
+      this.partitionKey = partitionKey;
     }
     public Object call(final GetTableLocationsResponsePB response) {
       if (response.hasError()) {
@@ -1273,6 +1277,10 @@ public class AsyncKuduClient implements AutoCloseable {
         }
       } else {
         discoverTablets(table, response.getTabletLocationsList());
+        if (partitionKey != null) {
+          discoverNonCoveredRangePartitions(table.getTableId(), partitionKey,
+                                            response.getTabletLocationsList());
+        }
       }
       return null;
     }
@@ -1311,8 +1319,8 @@ public class AsyncKuduClient implements AutoCloseable {
     ConcurrentSkipListMap<byte[], RemoteTablet> tablets = tabletsCache.get(tableId);
     if (tablets == null) {
       tablets = new ConcurrentSkipListMap<>(Bytes.MEMCMP);
-      ConcurrentSkipListMap<byte[], RemoteTablet> oldTablets = tabletsCache.putIfAbsent
-          (tableId, tablets);
+      ConcurrentSkipListMap<byte[], RemoteTablet> oldTablets =
+          tabletsCache.putIfAbsent(tableId, tablets);
       if (oldTablets != null) {
         tablets = oldTablets;
       }
@@ -1337,7 +1345,7 @@ public class AsyncKuduClient implements AutoCloseable {
         // someone beat us to it
         continue;
       }
-      LOG.info("Discovered tablet {} for table {} with partition {}",
+      LOG.info("Discovered tablet {} for table '{}' with partition {}",
                tabletId.toString(Charset.defaultCharset()), tableName, rt.getPartition());
       rt.refreshTabletClients(tabletPb);
       // This is making this tablet available
@@ -1347,6 +1355,53 @@ public class AsyncKuduClient implements AutoCloseable {
     }
   }
 
+  private void discoverNonCoveredRangePartitions(String tableId,
+                                                 byte[] partitionKey,
+                                                 List<Master.TabletLocationsPB> locations) {
+    NonCoveredRangeCache nonCoveredRanges = nonCoveredRangeCaches.get(tableId);
+    if (nonCoveredRanges == null) {
+      nonCoveredRanges = new NonCoveredRangeCache();
+      NonCoveredRangeCache oldCache = nonCoveredRangeCaches.putIfAbsent(tableId, nonCoveredRanges);
+      if (oldCache != null) {
+        nonCoveredRanges = oldCache;
+      }
+    }
+
+    // If there are no locations, then the table has no tablets. This is
+    // guaranteed because we never set an upper bound on the GetTableLocations
+    // request, and the master will always return the tablet *before* the start
+    // of the request, if the start key falls in a non-covered range (see the
+    // comment on GetTableLocationsResponsePB in master.proto).
+    if (locations.isEmpty()) {
+      nonCoveredRanges.addNonCoveredRange(EMPTY_ARRAY, EMPTY_ARRAY);
+      return;
+    }
+
+    // If the first tablet occurs after the requested partition key,
+    // then there is an initial non-covered range.
+    byte[] firstStartKey = locations.get(0).getPartition().getPartitionKeyStart().toByteArray();
+    if (Bytes.memcmp(partitionKey, firstStartKey) < 0) {
+      nonCoveredRanges.addNonCoveredRange(EMPTY_ARRAY, firstStartKey);
+    }
+
+    byte[] previousEndKey = null;
+    for (Master.TabletLocationsPB location : locations) {
+      byte[] startKey = location.getPartition().getPartitionKeyStart().toByteArray();
+
+      // Check if there is a non-covered range between this tablet and the previous.
+      if (previousEndKey != null && Bytes.memcmp(previousEndKey, startKey) < 0) {
+        nonCoveredRanges.addNonCoveredRange(previousEndKey, startKey);
+      }
+      previousEndKey = location.getPartition().getPartitionKeyEnd().toByteArray();
+    }
+
+    if (previousEndKey.length > 0 && Bytes.memcmp(previousEndKey, partitionKey) <= 0) {
+      // This happens if the partition key falls in a non-covered range that
+      // is unbounded (to the right).
+      nonCoveredRanges.addNonCoveredRange(previousEndKey, EMPTY_ARRAY);
+    }
+  }
+
   RemoteTablet createTabletFromPb(String tableId, Master.TabletLocationsPB tabletPb) {
     Partition partition = ProtobufHelper.pbToPartition(tabletPb.getPartition());
     Slice tabletId = new Slice(tabletPb.getTabletId().toByteArray());
@@ -1399,7 +1454,9 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param deadline deadline in milliseconds for this lookup to finish
    * @return a deferred containing the located tablet
    */
-  Deferred<LocatedTablet> getTabletLocation(KuduTable table, byte[] partitionKey, long deadline) {
+  Deferred<LocatedTablet> getTabletLocation(final KuduTable table,
+                                            final byte[] partitionKey,
+                                            long deadline) {
     // Locate the tablets at the partition key by locating all tablets between
     // the partition key (inclusive), and the incremented partition key (exclusive).
 
@@ -1412,18 +1469,41 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     // Then pick out the single tablet result from the list.
-    return locatedTablets.addCallback(new Callback<LocatedTablet, List<LocatedTablet>>() {
-      @Override
-      public LocatedTablet call(List<LocatedTablet> tablets) {
-        Preconditions.checkArgument(tablets.size() <= 1,
-                                    "found more than one tablet for a single partition key");
-        Preconditions.checkArgument(!tablets.isEmpty(), "found non-covered partition range");
-        return tablets.get(0);
-      }
-    });
+    return locatedTablets.addCallbackDeferring(
+        new Callback<Deferred<LocatedTablet>, List<LocatedTablet>>() {
+          @Override
+          public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) {
+            Preconditions.checkArgument(tablets.size() <= 1,
+                                        "found more than one tablet for a single partition key");
+            if (tablets.size() == 0) {
+              Map.Entry<byte[], byte[]> nonCoveredRange =
+                  nonCoveredRangeCaches.get(table.getTableId()).getNonCoveredRange(partitionKey);
+              return Deferred.fromError(new NonCoveredRangeException(nonCoveredRange.getKey(),
+                                                                     nonCoveredRange.getValue()));
+            }
+            return Deferred.fromResult(tablets.get(0));
+          }
+        });
   }
 
   /**
+   * Returns the non-covered range partition containing the {@code partitionKey} in
+   * the table, or null if there is no known non-covering range for the partition key.
+   * @param tableId of the table
+   * @param partitionKey to lookup
+   * @return the non-covering partition range, or {@code null}
+   */
+   Map.Entry<byte[], byte[]> getNonCoveredRange(String tableId, byte[] partitionKey) {
+     if (isMasterTable(tableId)) {
+       throw new IllegalArgumentException("No non-covering range partitions for the master");
+     }
+     NonCoveredRangeCache nonCoveredRangeCache = nonCoveredRangeCaches.get(tableId);
+     if (nonCoveredRangeCache == null) return null;
+
+     return nonCoveredRangeCache.getNonCoveredRange(partitionKey);
+   }
+
+  /**
    * Retrieve the master registration (see {@link GetMasterRegistrationResponse}
    * for a replica.
    * @param masterClient An initialized client for the master replica.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
index cf1e384..5e2d18c 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
@@ -426,27 +426,61 @@ public final class AsyncKuduScanner {
       return Deferred.fromResult(null);
     } else if (tablet == null) {
 
-      // We need to open the scanner first.
-      return client.openScanner(this).addCallbackDeferring(
-          new Callback<Deferred<RowResultIterator>, AsyncKuduScanner.Response>() {
-            public Deferred<RowResultIterator> call(final AsyncKuduScanner.Response resp) {
-              if (!resp.more || resp.scanner_id == null) {
-                scanFinished();
-                return Deferred.fromResult(resp.data); // there might be data to return
-              }
-              scannerId = resp.scanner_id;
-              sequenceId++;
-              hasMore = resp.more;
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Scanner " + Bytes.pretty(scannerId) + " opened on " + tablet);
-              }
-              //LOG.info("Scan.open is returning rows: " + resp.data.getNumRows());
-              return Deferred.fromResult(resp.data);
-            }
-            public String toString() {
-              return "scanner opened";
+      Callback<Deferred<RowResultIterator>, AsyncKuduScanner.Response> cb =
+          new Callback<Deferred<RowResultIterator>, Response>() {
+        @Override
+        public Deferred<RowResultIterator> call(Response resp) throws Exception {
+          if (!resp.more || resp.scanner_id == null) {
+            scanFinished();
+            return Deferred.fromResult(resp.data); // there might be data to return
+          }
+          scannerId = resp.scanner_id;
+          sequenceId++;
+          hasMore = resp.more;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Scanner " + Bytes.pretty(scannerId) + " opened on " + tablet);
+          }
+          return Deferred.fromResult(resp.data);
+        }
+        public String toString() {
+          return "scanner opened";
+        }
+      };
+
+      Callback<Deferred<RowResultIterator>, Exception> eb =
+          new Callback<Deferred<RowResultIterator>, Exception>() {
+        @Override
+        public Deferred<RowResultIterator> call(Exception e) throws Exception {
+          invalidate();
+          if (e instanceof NonCoveredRangeException) {
+            NonCoveredRangeException ncre = (NonCoveredRangeException) e;
+            nextPartitionKey = ncre.getNonCoveredRangeEnd();
+
+            // Stop scanning if the non-covered range is past the end partition key.
+            if (ncre.getNonCoveredRangeEnd().length == 0
+                || (endPartitionKey != AsyncKuduClient.EMPTY_ARRAY
+                && Bytes.memcmp(endPartitionKey, ncre.getNonCoveredRangeEnd()) <= 0)) {
+              hasMore = false;
+              closed = true; // the scanner is closed on the other side at this point
+              return Deferred.fromResult(RowResultIterator.empty());
             }
-          });
+            nextPartitionKey = ncre.getNonCoveredRangeEnd();
+            scannerId = null;
+            sequenceId = 0;
+            return nextRows();
+          } else {
+            LOG.warn("Can not open scanner", e);
+            // Don't let the scanner think it's opened on this tablet.
+            return Deferred.fromError(e); // Let the error propogate.
+          }
+        }
+        public String toString() {
+          return "open scanner errback";
+        }
+      };
+
+      // We need to open the scanner first.
+      return client.sendRpcToTablet(getOpenRequest()).addCallbackDeferring(cb).addErrback(eb);
     } else if (prefetching && prefetcherDeferred != null) {
       // TODO KUDU-1260 - Check if this works and add a test
       prefetcherDeferred.chain(new Deferred<RowResultIterator>().addCallback(prefetch));
@@ -652,7 +686,7 @@ public final class AsyncKuduScanner {
   }
 
   /**
-   *  Helper object that contains all the info sent by a TS afer a Scan request
+   *  Helper object that contains all the info sent by a TS after a Scan request.
    */
   static final class Response {
     /** The ID associated with the scanner that issued the request.  */
@@ -690,7 +724,7 @@ public final class AsyncKuduScanner {
   /**
    * RPC sent out to fetch the next rows from the TabletServer.
    */
-  private final class ScanRequest extends KuduRpc<Response> implements KuduRpc.HasKey {
+  private final class ScanRequest extends KuduRpc<Response> {
 
     State state;
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
index 8ac70c4..024f66d 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
@@ -319,13 +319,33 @@ public class AsyncKuduSession implements SessionConfiguration {
 
       // Group the operations by tablet.
       Map<Slice, Batch> batches = new HashMap<>();
+      List<OperationResponse> opsFailedInLookup = new ArrayList<>();
 
       for (BufferedOperation bufferedOp : buffer.getOperations()) {
         Operation operation = bufferedOp.getOperation();
-        // TODO: when we have non-covered range partitioning the tablet lookup
-        // may fail with a NonCoveredRangeException. In this case we need to
-        // handle the exception, by adding it to a special BatchResponse below
-        // containing all such failed rows.
+        if (bufferedOp.tabletLookupFailed()) {
+          Exception failure = bufferedOp.getTabletLookupFailure();
+          RowError error;
+          if (failure instanceof NonCoveredRangeException) {
+            // TODO: this should be something different than NotFound so that
+            // applications can distinguish from updates on missing rows.
+            error = new RowError(Status.NotFound(failure.getMessage()), operation);
+          } else {
+            LOG.warn("unexpected tablet lookup failure for operation {}", operation, failure);
+            error = new RowError(Status.RuntimeError(failure.getMessage()), operation);
+          }
+          OperationResponse response = new OperationResponse(0, null, 0, operation, error);
+          // Add the row error to the error collector if the session is in background flush mode,
+          // and complete the operation's deferred with the error response. The ordering between
+          // adding to the error collector and completing the deferred should not matter since
+          // applications should be using one or the other method for error handling, not both.
+          if (flushMode == FlushMode.AUTO_FLUSH_BACKGROUND) {
+            errorCollector.addError(error);
+          }
+          operation.callback(response);
+          opsFailedInLookup.add(response);
+          continue;
+        }
         LocatedTablet tablet = bufferedOp.getTablet();
         Slice tabletId = new Slice(tablet.getTabletId());
 
@@ -337,7 +357,10 @@ public class AsyncKuduSession implements SessionConfiguration {
         batch.add(operation);
       }
 
-      List<Deferred<BatchResponse>> batchResponses = new ArrayList<>(batches.size());
+      List<Deferred<BatchResponse>> batchResponses = new ArrayList<>(batches.size() + 1);
+      if (!opsFailedInLookup.isEmpty()) {
+        batchResponses.add(Deferred.fromResult(new BatchResponse(opsFailedInLookup)));
+      }
 
       for (Batch batch : batches.values()) {
         if (timeoutMs != 0) {
@@ -762,15 +785,16 @@ public class AsyncKuduSession implements SessionConfiguration {
    * Container class holding all the state associated with a buffered operation.
    */
   private static final class BufferedOperation {
-    private LocatedTablet tablet = null;
+    /** Holds either a {@link LocatedTablet} or the failure exception if the lookup failed. */
+    private Object tablet = null;
     private final Deferred<Void> tabletLookup;
     private final Operation operation;
 
     public BufferedOperation(Deferred<LocatedTablet> tablet,
                              Operation operation) {
-      tabletLookup = tablet.addCallback(new Callback<Void, LocatedTablet>() {
+      tabletLookup = AsyncUtil.addBoth(tablet, new Callback<Void, Object>() {
         @Override
-        public Void call(final LocatedTablet tablet) {
+        public Void call(final Object tablet) {
           BufferedOperation.this.tablet = tablet;
           return null;
         }
@@ -778,8 +802,29 @@ public class AsyncKuduSession implements SessionConfiguration {
       this.operation = Preconditions.checkNotNull(operation);
     }
 
+    /**
+     * @return {@code true} if the tablet lookup failed.
+     */
+    public boolean tabletLookupFailed() {
+      return !(tablet instanceof LocatedTablet);
+    }
+
+    /**
+     * @return the located tablet
+     * @throws ClassCastException if the tablet lookup failed,
+     *         check with {@link #tabletLookupFailed} before calling
+     */
     public LocatedTablet getTablet() {
-      return tablet;
+      return (LocatedTablet) tablet;
+    }
+
+    /**
+     * @return the cause of the failed lookup
+     * @throws ClassCastException if the tablet lookup succeeded,
+     *         check with {@link #tabletLookupFailed} before calling
+     */
+    public Exception getTabletLookupFailure() {
+      return (Exception) tablet;
     }
 
     public Deferred<Void> getTabletLookup() {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Batch.java b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
index 0a14ae8..3e3b960 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
@@ -40,7 +40,7 @@ import org.kududb.util.Slice;
  * server.
  */
 @InterfaceAudience.Private
-public class Batch extends KuduRpc<BatchResponse> implements KuduRpc.HasKey {
+class Batch extends KuduRpc<BatchResponse> {
 
   /** Holds batched operations. */
   final List<Operation> operations = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java b/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java
index ee9cc66..f67153b 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java
@@ -16,6 +16,8 @@
 // under the License.
 package org.kududb.client;
 
+import com.google.common.collect.ImmutableList;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -76,6 +78,13 @@ public class BatchResponse extends KuduRpcResponse {
     assert (individualResponses.size() == operations.size());
   }
 
+  BatchResponse(List<OperationResponse> individualResponses) {
+    super(0, null);
+    writeTimestamp = 0;
+    rowErrors = ImmutableList.of();
+    this.individualResponses = individualResponses;
+  }
+
   /**
    * Gives the write timestamp that was returned by the Tablet Server.
    * @return a timestamp in milliseconds, 0 if the external consistency mode set in AsyncKuduSession

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Bytes.java b/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
index 0efaaa0..0e68e93 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
@@ -25,6 +25,7 @@
  */
 package org.kududb.client;
 
+import com.google.common.io.BaseEncoding;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ZeroCopyLiteralByteString;
 import org.kududb.annotations.InterfaceAudience;
@@ -792,11 +793,6 @@ public final class Bytes {
 
   /**
    * Convert a byte array to a hex encoded string.
-   *
-   * TODO: replace this with {@link com.google.common.io.BaseEncoding}
-   * when the Guava version is bumped.
-   *
-   * https://stackoverflow.com/questions/9655181/how-to-convert-a-byte-array-to-a-hex-string-in-java
    * @param bytes the bytes to encode
    * @return the hex encoded bytes
    */
@@ -804,12 +800,7 @@ public final class Bytes {
     StringBuilder sb = new StringBuilder(2 + bytes.length * 2);
     sb.append('0');
     sb.append('x');
-
-    for (byte b : bytes) {
-      int v = b & 0xFF;
-      sb.append(HEX[v >>> 4]);
-      sb.append(HEX[v & 0x0F]);
-    }
+    sb.append(BaseEncoding.base16().encode(bytes));
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/CreateTableOptions.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/CreateTableOptions.java b/java/kudu-client/src/main/java/org/kududb/client/CreateTableOptions.java
index be81d5a..20bc4c3 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/CreateTableOptions.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/CreateTableOptions.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.kududb.client;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 import java.util.List;
@@ -24,6 +25,7 @@ import org.kududb.Common;
 import org.kududb.annotations.InterfaceAudience;
 import org.kududb.annotations.InterfaceStability;
 import org.kududb.master.Master;
+import org.kududb.util.Pair;
 
 /**
  * This is a builder class for all the options that can be provided while creating a table.
@@ -34,6 +36,7 @@ public class CreateTableOptions {
 
   private Master.CreateTableRequestPB.Builder pb = Master.CreateTableRequestPB.newBuilder();
   private final List<PartialRow> splitRows = Lists.newArrayList();
+  private final List<Pair<PartialRow, PartialRow>> rangeBounds = Lists.newArrayList();
 
   /**
    * Add a split point for the table. The table in the end will have splits + 1 tablets.
@@ -48,6 +51,29 @@ public class CreateTableOptions {
   }
 
   /**
+   * Add a partition range bound to the table with an inclusive lower bound and
+   * exclusive upper bound.
+   *
+   * If either row is empty, then that end of the range will be unbounded. If a
+   * range column is missing a value, the logical minimum value for that column
+   * type will be used as the default.
+   *
+   * Multiple range bounds may be added, but they must not overlap. All split
+   * rows must fall in one of the range bounds. The lower bound must be less
+   * than the upper bound.
+   *
+   * If not provided, the table's range will be unbounded.
+   *
+   * @param lower the inclusive lower bound
+   * @param upper the exclusive upper bound
+   * @return this instance
+   */
+  public CreateTableOptions addRangeBound(PartialRow lower, PartialRow upper) {
+    rangeBounds.add(new Pair<>(new PartialRow(lower), new PartialRow(upper)));
+    return this;
+  }
+
+  /**
    * Add a set of hash partitions to the table.
    *
    * Each column must be a part of the table's primary key, and an individual
@@ -129,9 +155,18 @@ public class CreateTableOptions {
   }
 
   Master.CreateTableRequestPB.Builder getBuilder() {
-    if (!splitRows.isEmpty()) {
-      pb.setSplitRowsRangeBounds(new Operation.OperationsEncoder().encodeSplitRows(splitRows));
+    if (!splitRows.isEmpty() || !rangeBounds.isEmpty()) {
+      pb.setSplitRowsRangeBounds(new Operation.OperationsEncoder()
+                                     .encodeSplitRowsRangeBounds(splitRows, rangeBounds));
     }
     return pb;
   }
+
+  List<Integer> getRequiredFeatureFlags() {
+    if (rangeBounds.isEmpty()) {
+      return ImmutableList.of();
+    } else {
+      return ImmutableList.of(Master.MasterFeatures.RANGE_PARTITION_BOUNDS_VALUE);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/CreateTableRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/CreateTableRequest.java b/java/kudu-client/src/main/java/org/kududb/client/CreateTableRequest.java
index b62d252..31ed9a2 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/CreateTableRequest.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/CreateTableRequest.java
@@ -17,6 +17,10 @@
 package org.kududb.client;
 
 import com.google.protobuf.Message;
+
+import java.util.Collection;
+import java.util.List;
+
 import org.kududb.Schema;
 import org.kududb.annotations.InterfaceAudience;
 import org.kududb.master.Master;
@@ -34,6 +38,7 @@ class CreateTableRequest extends KuduRpc<CreateTableResponse> {
   private final Schema schema;
   private final String name;
   private final Master.CreateTableRequestPB.Builder builder;
+  private final List<Integer> featureFlags;
 
   CreateTableRequest(KuduTable masterTable, String name, Schema schema,
                      CreateTableOptions builder) {
@@ -41,6 +46,7 @@ class CreateTableRequest extends KuduRpc<CreateTableResponse> {
     this.schema = schema;
     this.name = name;
     this.builder = builder.getBuilder();
+    featureFlags = builder.getRequiredFeatureFlags();
   }
 
   @Override
@@ -69,4 +75,9 @@ class CreateTableRequest extends KuduRpc<CreateTableResponse> {
     return new Pair<CreateTableResponse, Object>(
         response, builder.hasError() ? builder.getError() : null);
   }
+
+  @Override
+  Collection<Integer> getRequiredFeatures() {
+    return featureFlags;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java b/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
index 41a9636..1fe2e6e 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
@@ -67,13 +67,14 @@ public abstract class KuduRpc<R> {
 
   private static final Logger LOG = LoggerFactory.getLogger(KuduRpc.class);
 
-  public interface HasKey {
-    /**
-     * Returns the partition key this RPC is for.
-     * <p>
-     * <strong>DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED.</strong>
-     */
-    byte[] partitionKey();
+  /**
+   * Returns the partition key this RPC is for, or {@code null} if the RPC is
+   * not tablet specific.
+   * <p>
+   * <strong>DO NOT MODIFY THE CONTENTS OF THE RETURNED ARRAY.</strong>
+   */
+  byte[] partitionKey() {
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/KuduRpcResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduRpcResponse.java b/java/kudu-client/src/main/java/org/kududb/client/KuduRpcResponse.java
index ff10469..981e04a 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduRpcResponse.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduRpcResponse.java
@@ -28,8 +28,8 @@ abstract class KuduRpcResponse {
 
   /**
    * Constructor with information common to all RPCs.
-   * @param elapsedMillis Time in milliseconds since RPC creation to now.
-   * @param tsUUID A string that contains the UUID of the server that answered the RPC.
+   * @param elapsedMillis time in milliseconds since RPC creation to now
+   * @param tsUUID a string that contains the UUID of the server that answered the RPC
    */
   KuduRpcResponse(long elapsedMillis, String tsUUID) {
     this.elapsedMillis = elapsedMillis;
@@ -39,15 +39,16 @@ abstract class KuduRpcResponse {
   /**
    * Get the number of milliseconds elapsed since the RPC was created up to the moment when this
    * response was created.
-   * @return Elapsed time in milliseconds.
+   * @return elapsed time in milliseconds
    */
   public long getElapsedMillis() {
     return elapsedMillis;
   }
 
   /**
-   * Get the identifier of the tablet server that sent the response.
-   * @return A string containing a UUID.
+   * Get the identifier of the tablet server that sent the response. May be
+   * {@code null} if the RPC failed before tablet location lookup succeeded.
+   * @return a string containing a UUID
    */
   public String getTsUUID() {
     return tsUUID;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/NonCoveredRangeCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/NonCoveredRangeCache.java b/java/kudu-client/src/main/java/org/kududb/client/NonCoveredRangeCache.java
new file mode 100644
index 0000000..1c3b024
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/kududb/client/NonCoveredRangeCache.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import com.google.common.base.Joiner;
+import com.google.common.primitives.UnsignedBytes;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache of the non-covered range partitions in a Kudu table.
+ *
+ * Currently entries are never invalidated from the cache.
+ */
+@ThreadSafe
+@InterfaceAudience.Private
+class NonCoveredRangeCache {
+  private static final Logger LOG = LoggerFactory.getLogger(NonCoveredRangeCache.class);
+  private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
+
+  private final ConcurrentNavigableMap<byte[], byte[]> nonCoveredRanges =
+      new ConcurrentSkipListMap<>(COMPARATOR);
+
+  /**
+   * Retrieves a non-covered range from the cache.
+   *
+   * The pair contains the inclusive start partition key and the exclusive end
+   * partition key containing the provided partition key. If there is no such
+   * cached range, null is returned.
+   *
+   * @param partitionKey the partition key to lookup in the cache
+   * @return the non covered range, or null
+   */
+  public Map.Entry<byte[], byte[]> getNonCoveredRange(byte[] partitionKey) {
+    Map.Entry<byte[], byte[]> range = nonCoveredRanges.floorEntry(partitionKey);
+    if (range == null ||
+        (range.getValue().length != 0 && COMPARATOR.compare(partitionKey, range.getValue()) >= 0)) {
+      return null;
+    } else {
+      return range;
+    }
+  }
+
+  /**
+   * Adds a non-covered range to the cache.
+   *
+   * @param startPartitionKey the inclusive start partition key of the non-covered range
+   * @param endPartitionKey the exclusive end partition key of the non-covered range
+   */
+  public void addNonCoveredRange(byte[] startPartitionKey, byte[] endPartitionKey) {
+    if (startPartitionKey == null || endPartitionKey == null) {
+      throw new IllegalArgumentException("Non-covered partition range keys may not be null");
+    }
+    // Concurrent additions of the same non-covered range key are handled by
+    // serializing puts through the concurrent map.
+    if (nonCoveredRanges.put(startPartitionKey, endPartitionKey) == null) {
+      LOG.info("Discovered non-covered partition range [{}, {})",
+               Bytes.hex(startPartitionKey), Bytes.hex(endPartitionKey));
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append('[');
+    boolean isFirst = true;
+    for (Map.Entry<byte[], byte[]> range : nonCoveredRanges.entrySet()) {
+      if (isFirst) {
+        isFirst = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append('[');
+      sb.append(range.getKey().length == 0 ? "<start>" : Bytes.hex(range.getKey()));
+      sb.append(", ");
+      sb.append(range.getValue().length == 0 ? "<end>" : Bytes.hex(range.getValue()));
+      sb.append(')');
+    }
+    sb.append(']');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/NonCoveredRangeException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/NonCoveredRangeException.java b/java/kudu-client/src/main/java/org/kududb/client/NonCoveredRangeException.java
new file mode 100644
index 0000000..3400a51
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/kududb/client/NonCoveredRangeException.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+/**
+ * Exception indicating that an operation attempted to access a non-covered range partition.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class NonCoveredRangeException extends KuduException {
+  private final byte[] nonCoveredRangeStart;
+  private final byte[] nonCoveredRangeEnd;
+
+  public NonCoveredRangeException(byte[] nonCoveredRangeStart, byte[] nonCoveredRangeEnd) {
+    super("non-covered range");
+    this.nonCoveredRangeStart = nonCoveredRangeStart;
+    this.nonCoveredRangeEnd = nonCoveredRangeEnd;
+  }
+
+  byte[] getNonCoveredRangeStart() {
+    return nonCoveredRangeStart;
+  }
+
+  byte[] getNonCoveredRangeEnd() {
+    return nonCoveredRangeEnd;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "NonCoveredRangeException([%s, %s))",
+        nonCoveredRangeStart.length == 0 ? "<start>" : Bytes.hex(nonCoveredRangeStart),
+        nonCoveredRangeEnd.length == 0 ? "<end>" : Bytes.hex(nonCoveredRangeEnd));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Operation.java b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
index 8539f01..0e67c4e 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
@@ -46,7 +46,7 @@ import java.util.List;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public abstract class Operation extends KuduRpc<OperationResponse> implements KuduRpc.HasKey {
+public abstract class Operation extends KuduRpc<OperationResponse> {
   /**
    * This size will be set when serialize is called. It stands for the size of the row in this
    * operation.
@@ -58,7 +58,9 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
     UPDATE((byte)RowOperationsPB.Type.UPDATE.getNumber()),
     DELETE((byte)RowOperationsPB.Type.DELETE.getNumber()),
     SPLIT_ROWS((byte)RowOperationsPB.Type.SPLIT_ROW.getNumber()),
-    UPSERT((byte)RowOperationsPB.Type.UPSERT.getNumber());
+    UPSERT((byte)RowOperationsPB.Type.UPSERT.getNumber()),
+    RANGE_LOWER_BOUND((byte) RowOperationsPB.Type.RANGE_LOWER_BOUND.getNumber()),
+    RANGE_UPPER_BOUND((byte) RowOperationsPB.Type.RANGE_UPPER_BOUND.getNumber());
 
     ChangeType(byte encodedByte) {
       this.encodedByte = encodedByte;
@@ -147,7 +149,7 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
       }
     }
     OperationResponse response = new OperationResponse(deadlineTracker.getElapsedMillis(), tsUUID,
-        builder.getTimestamp(), this, error);
+                                                       builder.getTimestamp(), this, error);
     return new Pair<OperationResponse, Object>(
         response, builder.hasError() ? builder.getError() : null);
   }
@@ -313,12 +315,25 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
       return toPB();
     }
 
-    public RowOperationsPB encodeSplitRows(List<PartialRow> rows) {
-      if (rows == null || rows.isEmpty()) return null;
-      init(rows.get(0).getSchema(), rows.size());
-      for (PartialRow row : rows) {
+    public RowOperationsPB encodeSplitRowsRangeBounds(List<PartialRow> splitRows,
+                                                      List<Pair<PartialRow, PartialRow>> rangeBounds) {
+      if (splitRows.isEmpty() && rangeBounds.isEmpty()) {
+        return null;
+      }
+
+      Schema schema = splitRows.isEmpty() ? rangeBounds.get(0).getFirst().getSchema()
+                                          : splitRows.get(0).getSchema();
+      init(schema, splitRows.size() + 2 * rangeBounds.size());
+
+      for (PartialRow row : splitRows) {
         encodeRow(row, ChangeType.SPLIT_ROWS);
       }
+
+      for (Pair<PartialRow, PartialRow> bound : rangeBounds) {
+        encodeRow(bound.getFirst(), ChangeType.RANGE_LOWER_BOUND);
+        encodeRow(bound.getSecond(), ChangeType.RANGE_UPPER_BOUND);
+      }
+
       return toPB();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/OperationResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/OperationResponse.java b/java/kudu-client/src/main/java/org/kududb/client/OperationResponse.java
index 3e3e0e9..bf707ce 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/OperationResponse.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/OperationResponse.java
@@ -78,8 +78,8 @@ public class OperationResponse extends KuduRpcResponse {
 
   /**
    * Gives the write timestamp that was returned by the Tablet Server.
-   * @return a timestamp in milliseconds, 0 if the external consistency mode set in AsyncKuduSession
-   * wasn't CLIENT_PROPAGATED
+   * @return a timestamp in milliseconds, 0 if the external consistency mode set
+   *         in AsyncKuduSession wasn't CLIENT_PROPAGATED, or if the operation failed.
    */
   public long getWriteTimestamp() {
     return writeTimestamp;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/Partition.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Partition.java b/java/kudu-client/src/main/java/org/kududb/client/Partition.java
index 6e8951e..bdc089b 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Partition.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Partition.java
@@ -176,7 +176,7 @@ public class Partition implements Comparable<Partition> {
   @Override
   public String toString() {
     return String.format("[%s, %s)",
-                         Bytes.pretty(partitionKeyStart),
-                         Bytes.pretty(partitionKeyEnd));
+                         partitionKeyStart.length == 0 ? "<start>" : Bytes.hex(partitionKeyStart),
+                         partitionKeyEnd.length == 0 ? "<end>" : Bytes.hex(partitionKeyEnd));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/RowError.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/RowError.java b/java/kudu-client/src/main/java/org/kududb/client/RowError.java
index 1fd8381..b4c8f36 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/RowError.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/RowError.java
@@ -32,7 +32,7 @@ public class RowError {
   private final String tsUUID;
 
   /**
-   * Package-private for unit tests.
+   * Creates a new {@code RowError} with the provided status, operation, and tablet server UUID.
    */
   RowError(Status status, Operation operation, String tsUUID) {
     this.status = status;
@@ -41,6 +41,16 @@ public class RowError {
   }
 
   /**
+   * Creates a new {@code RowError} with the provided status, and operation.
+   *
+   * This constructor should be used when the operation fails before the tablet
+   * lookup is complete.
+   */
+  RowError(Status status, Operation operation) {
+    this(status, operation, null);
+  }
+
+  /**
    * Get the status code and message of the row error.
    */
   public Status getErrorStatus() {
@@ -67,7 +77,7 @@ public class RowError {
 
   /**
    * Get the Operation that failed.
-   * @return The same Operation instance that failed.
+   * @return The same Operation instance that failed
    */
   public Operation getOperation() {
     return operation;
@@ -75,7 +85,9 @@ public class RowError {
 
   /**
    * Get the identifier of the tablet server that sent the error.
-   * @return A string containing a UUID.
+   * The UUID may be {@code null} if the failure occurred before sending the row
+   * to a tablet server (for instance, if the row falls in a non-covered range partition).
+   * @return A string containing a UUID
    */
   public String getTsUUID() {
     return tsUUID;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/main/java/org/kududb/client/RowResultIterator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/RowResultIterator.java b/java/kudu-client/src/main/java/org/kududb/client/RowResultIterator.java
index 3ed0822..a3c6941 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/RowResultIterator.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/RowResultIterator.java
@@ -32,6 +32,8 @@ import org.kududb.util.Slice;
 public class RowResultIterator extends KuduRpcResponse implements Iterator<RowResult>,
     Iterable<RowResult> {
 
+  private static final RowResultIterator EMPTY = new RowResultIterator(0, null, null, null, null);
+
   private final Schema schema;
   private final Slice bs;
   private final Slice indirectBs;
@@ -72,6 +74,13 @@ public class RowResultIterator extends KuduRpcResponse implements Iterator<RowRe
     this.rowResult = new RowResult(this.schema, this.bs, this.indirectBs);
   }
 
+  /**
+   * @return an empty row result iterator
+   */
+  static RowResultIterator empty() {
+    return EMPTY;
+  }
+
   @Override
   public boolean hasNext() {
     return this.currentRow < numRows;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
index 994fec1..1464fa4 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
@@ -236,6 +236,37 @@ public class BaseKuduTest {
     return new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"));
   }
 
+  /**
+   * Creates table options with non-covering range partitioning for a table with
+   * the basic schema. Range partition key ranges fall between the following values:
+   *
+   * [  0,  50)
+   * [ 50, 100)
+   * [200, 300)
+   */
+  public static CreateTableOptions getBasicTableOptionsWithNonCoveredRange() {
+    Schema schema = basicSchema;
+    CreateTableOptions option = new CreateTableOptions();
+    option.setRangePartitionColumns(ImmutableList.of("key"));
+
+    PartialRow aLowerBound = schema.newPartialRow();
+    aLowerBound.addInt("key", 0);
+    PartialRow aUpperBound = schema.newPartialRow();
+    aUpperBound.addInt("key", 100);
+    option.addRangeBound(aLowerBound, aUpperBound);
+
+    PartialRow bLowerBound = schema.newPartialRow();
+    bLowerBound.addInt("key", 200);
+    PartialRow bUpperBound = schema.newPartialRow();
+    bUpperBound.addInt("key", 300);
+    option.addRangeBound(bLowerBound, bUpperBound);
+
+    PartialRow split = schema.newPartialRow();
+    split.addInt("key", 50);
+    option.addSplitRow(split);
+    return option;
+  }
+
   protected Insert createBasicSchemaInsert(KuduTable table, int key) {
     Insert insert = table.newInsert();
     PartialRow row = insert.getRow();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/test/java/org/kududb/client/TestFlexiblePartitioning.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestFlexiblePartitioning.java b/java/kudu-client/src/test/java/org/kududb/client/TestFlexiblePartitioning.java
index 1efba5a..dafd74a 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestFlexiblePartitioning.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestFlexiblePartitioning.java
@@ -197,7 +197,7 @@ public class TestFlexiblePartitioning extends BaseKuduTest {
     }
   }
 
-  @Test
+  @Test(timeout = 100000)
   public void testHashBucketedTable() throws Exception {
     CreateTableOptions tableBuilder = new CreateTableOptions();
     tableBuilder.addHashPartitions(ImmutableList.of("a"), 3);
@@ -206,7 +206,7 @@ public class TestFlexiblePartitioning extends BaseKuduTest {
     testPartitionSchema(tableBuilder);
   }
 
-  @Test
+  @Test(timeout = 100000)
   public void testNonDefaultRangePartitionedTable() throws Exception {
     Schema schema = createSchema();
     CreateTableOptions tableBuilder = new CreateTableOptions();
@@ -224,7 +224,7 @@ public class TestFlexiblePartitioning extends BaseKuduTest {
     testPartitionSchema(tableBuilder);
   }
 
-  @Test
+  @Test(timeout = 100000)
   public void testHashBucketedAndRangePartitionedTable() throws Exception {
     Schema schema = createSchema();
     CreateTableOptions tableBuilder = new CreateTableOptions();
@@ -244,7 +244,71 @@ public class TestFlexiblePartitioning extends BaseKuduTest {
     testPartitionSchema(tableBuilder);
   }
 
-  @Test
+  @Test(timeout = 100000)
+  public void testNonCoveredRangePartitionedTable() throws Exception {
+    Schema schema = createSchema();
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(ImmutableList.of("a", "b", "c"));
+
+    // Create a non covered range between (3, 5, 6) and (4, 0, 0)
+
+    PartialRow lowerBoundA = schema.newPartialRow();
+    lowerBoundA.addString("a", "0");
+    lowerBoundA.addString("b", "0");
+    lowerBoundA.addString("c", "0");
+    PartialRow upperBoundA = schema.newPartialRow();
+    upperBoundA.addString("a", "3");
+    upperBoundA.addString("b", "5");
+    upperBoundA.addString("b", "6");
+    tableBuilder.addRangeBound(lowerBoundA, upperBoundA);
+
+    PartialRow lowerBoundB = schema.newPartialRow();
+    lowerBoundB.addString("a", "4");
+    lowerBoundB.addString("b", "0");
+    lowerBoundB.addString("c", "0");
+    PartialRow upperBoundB = schema.newPartialRow();
+    upperBoundB.addString("a", "5");
+    upperBoundB.addString("b", "5");
+    upperBoundB.addString("b", "6");
+    tableBuilder.addRangeBound(lowerBoundB, upperBoundB);
+
+    testPartitionSchema(tableBuilder);
+  }
+
+  @Test(timeout = 100000)
+  public void testHashBucketedAndNonCoveredRangePartitionedTable() throws Exception {
+    Schema schema = createSchema();
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(ImmutableList.of("a", "b", "c"));
+
+    // Create a non covered range between (3, 5, 6) and (4, 0, 0)
+
+    PartialRow lowerBoundA = schema.newPartialRow();
+    lowerBoundA.addString("a", "0");
+    lowerBoundA.addString("b", "0");
+    lowerBoundA.addString("c", "0");
+    PartialRow upperBoundA = schema.newPartialRow();
+    upperBoundA.addString("a", "3");
+    upperBoundA.addString("b", "5");
+    upperBoundA.addString("c", "6");
+    tableBuilder.addRangeBound(lowerBoundA, upperBoundA);
+
+    PartialRow lowerBoundB = schema.newPartialRow();
+    lowerBoundB.addString("a", "4");
+    lowerBoundB.addString("b", "0");
+    lowerBoundB.addString("c", "0");
+    PartialRow upperBoundB = schema.newPartialRow();
+    upperBoundB.addString("a", "5");
+    upperBoundB.addString("b", "5");
+    upperBoundB.addString("c", "6");
+    tableBuilder.addRangeBound(lowerBoundB, upperBoundB);
+
+    tableBuilder.addHashPartitions(ImmutableList.of("a", "b", "c"), 4);
+
+    testPartitionSchema(tableBuilder);
+  }
+
+  @Test(timeout = 100000)
   public void testSimplePartitionedTable() throws Exception {
     Schema schema = createSchema();
     CreateTableOptions tableBuilder =
@@ -262,7 +326,7 @@ public class TestFlexiblePartitioning extends BaseKuduTest {
     testPartitionSchema(tableBuilder);
   }
 
-  @Test
+  @Test(timeout = 100000)
   public void testUnpartitionedTable() throws Exception {
     CreateTableOptions tableBuilder =
         new CreateTableOptions().setRangePartitionColumns(ImmutableList.<String>of());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
index 8ee0eb4..3591b7b 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
@@ -28,8 +28,10 @@ import static org.kududb.client.RowResult.timestampToString;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -406,6 +408,67 @@ public class TestKuduClient extends BaseKuduTest {
   }
 
   /**
+   * Counts the rows in a table between two optional bounds.
+   * @param table the table to scan, must have the basic schema
+   * @param lowerBound an optional lower bound key
+   * @param upperBound an optional upper bound key
+   * @return the row count
+   * @throws Exception on error
+   */
+  private int countRowsForTestScanNonCoveredTable(KuduTable table,
+                                                  Integer lowerBound,
+                                                  Integer upperBound) throws Exception {
+
+    KuduScanner.KuduScannerBuilder scanBuilder = syncClient.newScannerBuilder(table);
+    if (lowerBound != null) {
+      PartialRow bound = basicSchema.newPartialRow();
+      bound.addInt(0, lowerBound);
+      scanBuilder.lowerBound(bound);
+    }
+    if (upperBound != null) {
+      PartialRow bound = basicSchema.newPartialRow();
+      bound.addInt(0, upperBound);
+      scanBuilder.exclusiveUpperBound(bound);
+    }
+
+    KuduScanner scanner = scanBuilder.build();
+    int count = 0;
+    while (scanner.hasMoreRows()) {
+      count += scanner.nextRows().getNumRows();
+    }
+    return count;
+  }
+
+  /**
+   * Tests scanning a table with non-covering range partitions.
+   */
+  @Test(timeout = 100000)
+  public void testScanNonCoveredTable() throws Exception {
+
+    Schema schema = basicSchema;
+    syncClient.createTable(tableName, schema, getBasicTableOptionsWithNonCoveredRange());
+
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+    KuduTable table = syncClient.openTable(tableName);
+
+    for (int key = 0; key < 100; key++) {
+      session.apply(createBasicSchemaInsert(table, key));
+    }
+    for (int key = 200; key < 300; key++) {
+      session.apply(createBasicSchemaInsert(table, key));
+    }
+    session.flush();
+    assertEquals(0, session.countPendingErrors());
+
+    assertEquals(200, countRowsForTestScanNonCoveredTable(table, null, null));
+    assertEquals(100, countRowsForTestScanNonCoveredTable(table, null, 200));
+    assertEquals(0, countRowsForTestScanNonCoveredTable(table, null, -1));
+    assertEquals(0, countRowsForTestScanNonCoveredTable(table, 120, 180));
+    assertEquals(0, countRowsForTestScanNonCoveredTable(table, 300, null));
+  }
+
+  /**
    * Creates a local client that we auto-close while buffering one row, then makes sure that after
    * closing that we can read the row.
    */

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
index 8ccd30b..0a9311d 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
@@ -18,20 +18,24 @@ package org.kududb.client;
 
 import java.util.List;
 
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 
 import static org.junit.Assert.*;
 
+import com.google.common.collect.ImmutableList;
+
 public class TestKuduSession extends BaseKuduTest {
-  // Generate a unique table name
-  private static final String TABLE_NAME_PREFIX =
-      TestKuduSession.class.getName()+"-"+System.currentTimeMillis();
+  @Rule
+  public TestName name = new TestName();
 
   private KuduTable table;
 
   @Test(timeout = 100000)
   public void testBasicOps() throws Exception {
-    String tableName = TABLE_NAME_PREFIX + "-testBasicOps";
+    String tableName = name.getMethodName();
     table = createTable(tableName, basicSchema, getBasicCreateTableOptions());
 
     KuduSession session = syncClient.newSession();
@@ -54,7 +58,7 @@ public class TestKuduSession extends BaseKuduTest {
 
   @Test(timeout = 100000)
   public void testIgnoreAllDuplicateRows() throws Exception {
-    String tableName = TABLE_NAME_PREFIX + "-testIgnoreAllDuplicateRows";
+    String tableName = name.getMethodName();
     table = createTable(tableName, basicSchema, getBasicCreateTableOptions());
 
     KuduSession session = syncClient.newSession();
@@ -86,7 +90,7 @@ public class TestKuduSession extends BaseKuduTest {
 
   @Test(timeout = 100000)
   public void testBatchWithSameRow() throws Exception {
-    String tableName = TABLE_NAME_PREFIX + "-testBatchWithSameRow";
+    String tableName = name.getMethodName();
     table = createTable(tableName, basicSchema, getBasicCreateTableOptions());
 
     KuduSession session = syncClient.newSession();
@@ -123,7 +127,7 @@ public class TestKuduSession extends BaseKuduTest {
    */
   @Test(timeout = 10000)
   public void testConcurrentFlushes() throws Exception {
-    String tableName = TABLE_NAME_PREFIX + "-testConcurrentFlushes";
+    String tableName = name.getMethodName();
     CreateTableOptions builder = getBasicCreateTableOptions();
     int numTablets = 4;
     int numRowsPerTablet = 100;
@@ -153,7 +157,7 @@ public class TestKuduSession extends BaseKuduTest {
 
   @Test(timeout = 10000)
   public void testOverWritingValues() throws Exception {
-    String tableName = TABLE_NAME_PREFIX + "-OverridingValues";
+    String tableName = name.getMethodName();
     table = createTable(tableName, basicSchema, getBasicCreateTableOptions());
     KuduSession session = syncClient.newSession();
     Insert insert = createInsert(0);
@@ -190,7 +194,7 @@ public class TestKuduSession extends BaseKuduTest {
 
   @Test(timeout = 10000)
   public void testUpsert() throws Exception {
-    String tableName = TABLE_NAME_PREFIX + "-Upsert";
+    String tableName = name.getMethodName();
     table = createTable(tableName, basicSchema, getBasicCreateTableOptions());
     KuduSession session = syncClient.newSession();
 
@@ -213,6 +217,105 @@ public class TestKuduSession extends BaseKuduTest {
         rowStrings.get(0));
   }
 
+  @Test(timeout = 10000)
+  public void testInsertManualFlushNonCoveredRange() throws Exception {
+    String tableName = name.getMethodName();
+    CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();
+    createOptions.setNumReplicas(1);
+    syncClient.createTable(tableName, basicSchema, createOptions);
+    KuduTable table = syncClient.openTable(tableName);
+
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+
+    // Insert in reverse sorted order so that more table location lookups occur
+    // (the extra results in table location lookups always occur past the inserted key).
+    List<Integer> nonCoveredKeys = ImmutableList.of(350, 300, 199, 150, 100, -1, -50);
+    for (int key : nonCoveredKeys) {
+      assertNull(session.apply(createBasicSchemaInsert(table, key)));
+    }
+    List<OperationResponse> results = session.flush();
+    assertEquals(nonCoveredKeys.size(), results.size());
+    for (OperationResponse result : results) {
+      assertTrue(result.hasRowError());
+      assertTrue(result.getRowError().getErrorStatus().isNotFound());
+    }
+
+    // Insert a batch of some valid and some invalid.
+    for (int key = 90; key < 110; key++) {
+      session.apply(createBasicSchemaInsert(table, key));
+    }
+    results = session.flush();
+
+    int failures = 0;
+    for (OperationResponse result : results) {
+      if (result.hasRowError()) {
+        failures++;
+        assertTrue(result.getRowError().getErrorStatus().isNotFound());
+      }
+    }
+    assertEquals(10, failures);
+  }
+
+  @Test(timeout = 10000)
+  public void testInsertAutoFlushSyncNonCoveredRange() throws Exception {
+    String tableName = name.getMethodName();
+    CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();
+    createOptions.setNumReplicas(1);
+    syncClient.createTable(tableName, basicSchema, createOptions);
+    KuduTable table = syncClient.openTable(tableName);
+
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+
+    List<Integer> nonCoveredKeys = ImmutableList.of(350, 300, 199, 150, 100, -1, -50);
+    for (int key : nonCoveredKeys) {
+      try {
+        session.apply(createBasicSchemaInsert(table, key));
+        fail("apply should have thrown");
+      } catch (NonCoveredRangeException e) {
+        // Expected
+      }
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testInsertAutoFlushBackgrounNonCoveredRange() throws Exception {
+    String tableName = name.getMethodName();
+    CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();
+    createOptions.setNumReplicas(1);
+    syncClient.createTable(tableName, basicSchema, createOptions);
+    KuduTable table = syncClient.openTable(tableName);
+
+    AsyncKuduSession session = client.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+
+    List<Integer> nonCoveredKeys = ImmutableList.of(350, 300, 199, 150, 100, -1, -50);
+    for (int key : nonCoveredKeys) {
+      OperationResponse result = session.apply(createBasicSchemaInsert(table, key)).join(5000);
+      assertTrue(result.hasRowError());
+      assertTrue(result.getRowError().getErrorStatus().isNotFound());
+    }
+
+    RowErrorsAndOverflowStatus errors = session.getPendingErrors();
+    assertEquals(nonCoveredKeys.size(), errors.getRowErrors().length);
+    for (RowError error : errors.getRowErrors()) {
+      assertTrue(error.getErrorStatus().isNotFound());
+    }
+
+    // Insert a batch of some valid and some invalid.
+    for (int key = 90; key < 110; key++) {
+      session.apply(createBasicSchemaInsert(table, key));
+    }
+    session.flush();
+
+    errors = session.getPendingErrors();
+    assertEquals(10, errors.getRowErrors().length);
+    for (RowError error : errors.getRowErrors()) {
+      assertTrue(error.getErrorStatus().isNotFound());
+    }
+  }
+
   private Insert createInsert(int key) {
     return createBasicSchemaInsert(table, key);
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/a8a2e198/java/kudu-client/src/test/java/org/kududb/client/TestKuduTable.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestKuduTable.java b/java/kudu-client/src/test/java/org/kududb/client/TestKuduTable.java
index d9e7c9d..3a1160f 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestKuduTable.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestKuduTable.java
@@ -16,6 +16,8 @@
 // under the License.
 package org.kududb.client;
 
+import org.junit.Rule;
+import org.junit.rules.TestName;
 import org.kududb.ColumnSchema;
 import org.kududb.Schema;
 import org.kududb.Type;
@@ -29,13 +31,11 @@ import java.util.List;
 
 import static org.junit.Assert.*;
 
-import com.google.common.collect.ImmutableList;
-
 public class TestKuduTable extends BaseKuduTest {
-
   private static final Logger LOG = LoggerFactory.getLogger(TestKuduTable.class);
 
-  private static final String BASE_TABLE_NAME = TestKuduTable.class.getName();
+  @Rule
+  public TestName name = new TestName();
 
   private static Schema schema = getBasicSchema();
 
@@ -46,7 +46,7 @@ public class TestKuduTable extends BaseKuduTest {
 
   @Test(timeout = 100000)
   public void testAlterTable() throws Exception {
-    String tableName = BASE_TABLE_NAME + System.currentTimeMillis();
+    String tableName = name.getMethodName();
     createTable(tableName, basicSchema, getBasicCreateTableOptions());
     try {
 
@@ -114,7 +114,7 @@ public class TestKuduTable extends BaseKuduTest {
    */
   @Test
   public void testGetLocations() throws Exception {
-    String table1 = BASE_TABLE_NAME + System.currentTimeMillis();
+    String table1 = name.getMethodName() + System.currentTimeMillis();
 
     // Test a non-existing table
     try {
@@ -124,7 +124,7 @@ public class TestKuduTable extends BaseKuduTest {
       // expected
     }
     // Test with defaults
-    String tableWithDefault = BASE_TABLE_NAME + "WithDefault" + System.currentTimeMillis();
+    String tableWithDefault = name.getMethodName() + "WithDefault" + System.currentTimeMillis();
     CreateTableOptions builder = getBasicCreateTableOptions();
     List<ColumnSchema> columns = new ArrayList<ColumnSchema>(schema.getColumnCount());
     int defaultInt = 30;
@@ -213,6 +213,47 @@ public class TestKuduTable extends BaseKuduTest {
     assertTrue(client.tableExists(tableWithDefault).join(DEFAULT_SLEEP));
   }
 
+  @Test(timeout = 100000)
+  public void testLocateTableNonCoveringRange() throws Exception {
+    String tableName = name.getMethodName();
+    syncClient.createTable(tableName, basicSchema, getBasicTableOptionsWithNonCoveredRange());
+    KuduTable table = syncClient.openTable(tableName);
+
+    List<LocatedTablet> tablets;
+
+    // all tablets
+    tablets = table.getTabletsLocations(null, null, 100000);
+    assertEquals(3, tablets.size());
+    assertArrayEquals(getKeyInBytes(0), tablets.get(0).getPartition().getPartitionKeyStart());
+    assertArrayEquals(getKeyInBytes(50), tablets.get(0).getPartition().getPartitionKeyEnd());
+    assertArrayEquals(getKeyInBytes(50), tablets.get(1).getPartition().getPartitionKeyStart());
+    assertArrayEquals(getKeyInBytes(100), tablets.get(1).getPartition().getPartitionKeyEnd());
+    assertArrayEquals(getKeyInBytes(200), tablets.get(2).getPartition().getPartitionKeyStart());
+    assertArrayEquals(getKeyInBytes(300), tablets.get(2).getPartition().getPartitionKeyEnd());
+
+    // key < 50
+    tablets = table.getTabletsLocations(null, getKeyInBytes(50), 100000);
+    assertEquals(1, tablets.size());
+    assertArrayEquals(getKeyInBytes(0), tablets.get(0).getPartition().getPartitionKeyStart());
+    assertArrayEquals(getKeyInBytes(50), tablets.get(0).getPartition().getPartitionKeyEnd());
+
+    // key >= 300
+    tablets = table.getTabletsLocations(getKeyInBytes(300), null, 100000);
+    assertEquals(0, tablets.size());
+
+    // key >= 299
+    tablets = table.getTabletsLocations(getKeyInBytes(299), null, 100000);
+    assertEquals(1, tablets.size());
+    assertArrayEquals(getKeyInBytes(200), tablets.get(0).getPartition().getPartitionKeyStart());
+    assertArrayEquals(getKeyInBytes(300), tablets.get(0).getPartition().getPartitionKeyEnd());
+
+    // key >= 150 && key < 250
+    tablets = table.getTabletsLocations(getKeyInBytes(150), getKeyInBytes(250), 100000);
+    assertEquals(1, tablets.size());
+    assertArrayEquals(getKeyInBytes(200), tablets.get(0).getPartition().getPartitionKeyStart());
+    assertArrayEquals(getKeyInBytes(300), tablets.get(0).getPartition().getPartitionKeyEnd());
+  }
+
   public byte[] getKeyInBytes(int i) {
     PartialRow row = schema.newPartialRow();
     row.addInt(0, i);
@@ -220,7 +261,7 @@ public class TestKuduTable extends BaseKuduTest {
   }
 
   public KuduTable createTableWithSplitsAndTest(int splitsCount) throws Exception {
-    String tableName = BASE_TABLE_NAME + System.currentTimeMillis();
+    String tableName = name.getMethodName() + System.currentTimeMillis();
     CreateTableOptions builder = getBasicCreateTableOptions();
 
     if (splitsCount != 0) {