You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/04/28 22:07:54 UTC

[2/3] kudu git commit: KUDU-579 [java_client] Scanner fault tolerance

KUDU-579 [java_client] Scanner fault tolerance

This patch adds java client support to restart scanners if they
fail in the middle of table scanning. AsyncKuduScanner records
the final primary key retrieved in the previous batch. If a tserver
fails while scanning, the client marks the tserver as failed and
retries the scan elsewhere, providing its last primary key.

Change-Id: I89d3634c4255b69e28f2de5412e6a5a9d34e931b
Reviewed-on: http://gerrit.cloudera.org:8080/6566
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b36b84dd
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b36b84dd
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b36b84dd

Branch: refs/heads/master
Commit: b36b84ddb70a909873b8fb26a96d2adab68919b7
Parents: 31751df
Author: hahao <ha...@cloudera.com>
Authored: Thu Apr 20 00:42:16 2017 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Fri Apr 28 18:34:02 2017 +0000

----------------------------------------------------------------------
 .../kudu/client/AbstractKuduScannerBuilder.java |  32 +++---
 .../org/apache/kudu/client/AsyncKuduClient.java |  15 +++
 .../apache/kudu/client/AsyncKuduScanner.java    | 105 ++++++++++++++-----
 .../org/apache/kudu/client/KuduScanToken.java   |   5 +
 .../org/apache/kudu/client/KuduScanner.java     |  21 +++-
 .../apache/kudu/client/RowResultIterator.java   |   2 +-
 .../kudu/client/ScannerExpiredException.java    |  47 +++++++++
 .../org/apache/kudu/client/TabletClient.java    |  12 ++-
 .../org/apache/kudu/client/BaseKuduTest.java    |  70 ++++++++++---
 .../kudu/client/ITFaultTolerantScanner.java     |  62 +++++++++++
 .../kudu/client/ITNonFaultTolerantScanner.java  | 103 ++++++++++++++++++
 .../kudu/client/ITScannerMultiTablet.java       |  81 ++++++++------
 12 files changed, 462 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index f1e0f10..12f4971 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -43,7 +43,7 @@ public abstract class AbstractKuduScannerBuilder
   final Map<String, KuduPredicate> predicates = new HashMap<>();
 
   AsyncKuduScanner.ReadMode readMode = AsyncKuduScanner.ReadMode.READ_LATEST;
-  Common.OrderMode orderMode = Common.OrderMode.UNORDERED;
+  boolean isFaultTolerant = false;
   int batchSizeBytes = 1024 * 1024;
   long limit = Long.MAX_VALUE;
   boolean prefetching = false;
@@ -74,23 +74,27 @@ public abstract class AbstractKuduScannerBuilder
     return (S) this;
   }
 
-  /**
-   * Return scan results in primary key sorted order.
-   *
-   * If the table is hash partitioned, the scan must have an equality predicate
-   * on all hashed columns.
+ /**
+   * Make scans resumable at another tablet server if current server fails if
+   * isFaultTolerant is true.
+   * <p>
+   * Scans are by default non fault-tolerant, and scans will fail
+   * if scanning an individual tablet fails (for example, if a tablet server
+   * crashes in the middle of a tablet scan). If isFaultTolerant is set to true,
+   * scans will be resumed at another tablet server in the case of failure.
    *
-   * Package private until proper hash partitioning equality predicate checks
-   * are in place.
+   * Fault-tolerant scans typically have lower throughput than non
+   * fault-tolerant scans. Fault tolerant scans use READ_AT_SNAPSHOT read mode.
+   * If no snapshot timestamp is provided, the server will pick one.
    *
-   * Disabled by default.
+   * @param isFaultTolerant a boolean that indicates if scan is fault-tolerant
    * @return this instance
    */
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  S sortResultsByPrimaryKey() {
-    orderMode = Common.OrderMode.ORDERED;
-    readMode = AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT;
+  public S setFaultTolerant(boolean isFaultTolerant) {
+    this.isFaultTolerant = isFaultTolerant;
+    if (isFaultTolerant) {
+      readMode = AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT;
+    }
     return (S) this;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index a621348..9ce0536 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -719,6 +719,21 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * Forcefully shuts down the RemoteTablet connection and
+   * fails all outstanding RPCs.
+   *
+   * @param tablet the given tablet
+   * @param replicaSelection replica selection mechanism to use
+   */
+  @VisibleForTesting
+  void closeCurrentConnection(RemoteTablet tablet,
+        ReplicaSelection replicaSelection) {
+    TabletClient client = connectionCache.getClient(
+        tablet.getReplicaSelectedUUID(replicaSelection));
+    client.shutdown();
+  }
+
+  /**
    * Sends the provided {@link KuduRpc} to the tablet server hosting the leader
    * of the tablet identified by the RPC's table and partition key.
    *

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index f9ee7a3..d471830 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -169,6 +169,8 @@ public final class AsyncKuduScanner {
    */
   private final byte[] endPrimaryKey;
 
+  private byte[] lastPrimaryKey;
+
   private final boolean prefetching;
 
   private final boolean cacheBlocks;
@@ -177,6 +179,8 @@ public final class AsyncKuduScanner {
 
   private final Common.OrderMode orderMode;
 
+  private final boolean isFaultTolerant;
+
   private long htTimestamp;
 
   private final ReplicaSelection replicaSelection;
@@ -218,7 +222,7 @@ public final class AsyncKuduScanner {
   final long scanRequestTimeout;
 
   AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames,
-                   List<Integer> projectedIndexes, ReadMode readMode, Common.OrderMode orderMode,
+                   List<Integer> projectedIndexes, ReadMode readMode, boolean isFaultTolerant,
                    long scanRequestTimeout,
                    Map<String, KuduPredicate> predicates, long limit,
                    boolean cacheBlocks, boolean prefetching,
@@ -235,16 +239,20 @@ public final class AsyncKuduScanner {
       checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "When specifying a " +
           "HybridClock timestamp, the read mode needs to be set to READ_AT_SNAPSHOT");
     }
-    if (orderMode == Common.OrderMode.ORDERED) {
-      checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "Returning rows in primary key order " +
+
+    this.isFaultTolerant = isFaultTolerant;
+    if (this.isFaultTolerant) {
+      checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "Use of fault tolerance scanner " +
           "requires the read mode to be set to READ_AT_SNAPSHOT");
+      this.orderMode = Common.OrderMode.ORDERED;
+    } else {
+      this.orderMode = Common.OrderMode.UNORDERED;
     }
 
     this.client = client;
     this.table = table;
     this.pruner = pruner;
     this.readMode = readMode;
-    this.orderMode = orderMode;
     this.scanRequestTimeout = scanRequestTimeout;
     this.predicates = predicates;
     this.limit = limit;
@@ -254,6 +262,7 @@ public final class AsyncKuduScanner {
     this.endPrimaryKey = endPrimaryKey;
     this.htTimestamp = htTimestamp;
     this.batchSizeBytes = batchSizeBytes;
+    this.lastPrimaryKey = AsyncKuduClient.EMPTY_ARRAY;
 
     // Map the column names to actual columns in the table schema.
     // If the user set this to 'null', we scan all columns.
@@ -382,6 +391,11 @@ public final class AsyncKuduScanner {
           if (resp.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
             client.updateLastPropagatedTimestamp(resp.propagatedTimestamp);
           }
+
+          if (isFaultTolerant && resp.lastPrimaryKey != null) {
+            lastPrimaryKey = resp.lastPrimaryKey;
+          }
+
           if (!resp.more || resp.scannerId == null) {
             scanFinished();
             return Deferred.fromResult(resp.data); // there might be data to return
@@ -483,14 +497,24 @@ public final class AsyncKuduScanner {
   /**
    * Creates a new errback to handle errors while trying to get more rows.
    */
-  private final Callback<Exception, Exception> nextRowErrback() {
-    return new Callback<Exception, Exception>() {
-      public Exception call(final Exception error) {
+  private final Callback<Deferred<RowResultIterator>, Exception> nextRowErrback() {
+    return new Callback<Deferred<RowResultIterator>, Exception>() {
+      @Override
+      public Deferred<RowResultIterator> call(Exception e) throws Exception {
         final RemoteTablet old_tablet = tablet;  // Save before invalidate().
         String message = old_tablet + " pretends to not know " + AsyncKuduScanner.this;
-        LOG.warn(message, error);
+        LOG.warn(message, e);
         invalidate();  // If there was an error, don't assume we're still OK.
-        return error;  // Let the error propagate.
+        // If encountered ScannerExpiredException, it means the scanner
+        // on the server side expired. This exception is only thrown for fault
+        // tolerant scanner. Therefore, open a new scanner.
+        if (e instanceof ScannerExpiredException) {
+          scannerId = null;
+          sequenceId = 0;
+          return nextRows();
+        } else {
+          return Deferred.fromError(e); // Let the error propagate.
+        }
       }
 
       public String toString() {
@@ -514,6 +538,7 @@ public final class AsyncKuduScanner {
     }
     scannerId = null;
     sequenceId = 0;
+    lastPrimaryKey = AsyncKuduClient.EMPTY_ARRAY;
     invalidate();
   }
 
@@ -544,7 +569,7 @@ public final class AsyncKuduScanner {
           LOG.debug("Scanner " + Bytes.pretty(scannerId) + " closed on " +
               tablet);
         }
-        tablet = null;
+        invalidate();
         scannerId = "client debug closed".getBytes();   // Make debugging easier.
         return response == null ? null : response.data;
       }
@@ -601,6 +626,15 @@ public final class AsyncKuduScanner {
   }
 
   /**
+   * Gets the replica selection mechanism being used.
+   *
+   * @return the replica selection mechanism.
+   */
+  ReplicaSelection getReplicaSelection() {
+    return replicaSelection;
+  }
+
+  /**
    * Returns an RPC to open this scanner.
    */
   KuduRpc<Response> getOpenRequest() {
@@ -666,16 +700,20 @@ public final class AsyncKuduScanner {
      */
     private final long propagatedTimestamp;
 
+    private final byte[] lastPrimaryKey;
+
     Response(final byte[] scannerId,
              final RowResultIterator data,
              final boolean more,
              final long scanTimestamp,
-             final long propagatedTimestamp) {
+             final long propagatedTimestamp,
+             final byte[] lastPrimaryKey) {
       this.scannerId = scannerId;
       this.data = data;
       this.more = more;
       this.scanTimestamp = scanTimestamp;
       this.propagatedTimestamp = propagatedTimestamp;
+      this.lastPrimaryKey = lastPrimaryKey;
     }
 
     public String toString() {
@@ -698,7 +736,7 @@ public final class AsyncKuduScanner {
   /**
    * RPC sent out to fetch the next rows from the TabletServer.
    */
-  private final class ScanRequest extends KuduRpc<Response> {
+  final class ScanRequest extends KuduRpc<Response> {
 
     State state;
 
@@ -759,6 +797,12 @@ public final class AsyncKuduScanner {
             newBuilder.setSnapTimestamp(AsyncKuduScanner.this.getSnapshotTimestamp());
           }
 
+          if (isFaultTolerant) {
+            if (AsyncKuduScanner.this.lastPrimaryKey.length > 0) {
+              newBuilder.setLastPrimaryKey(ZeroCopyLiteralByteString.copyFrom(lastPrimaryKey));
+            }
+          }
+
           if (AsyncKuduScanner.this.startPrimaryKey.length > 0) {
             newBuilder.setStartPrimaryKey(ZeroCopyLiteralByteString.copyFrom(startPrimaryKey));
           }
@@ -774,13 +818,11 @@ public final class AsyncKuduScanner {
                  .setBatchSizeBytes(batchSizeBytes);
           break;
         case NEXT:
-          setTablet(AsyncKuduScanner.this.tablet);
           builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
                  .setCallSeqId(AsyncKuduScanner.this.sequenceId)
                  .setBatchSizeBytes(batchSizeBytes);
           break;
         case CLOSING:
-          setTablet(AsyncKuduScanner.this.tablet);
           builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
                  .setBatchSizeBytes(0)
                  .setCloseScanner(true);
@@ -801,15 +843,27 @@ public final class AsyncKuduScanner {
       final byte[] id = resp.getScannerId().toByteArray();
       TabletServerErrorPB error = resp.hasError() ? resp.getError() : null;
 
-      if (error != null && error.getCode().equals(TabletServerErrorPB.Code.TABLET_NOT_FOUND)) {
-        if (state == State.OPENING) {
-          // Doing this will trigger finding the new location.
-          return new Pair<Response, Object>(null, error);
-        } else {
-          Status statusIncomplete = Status.Incomplete("Cannot continue scanning, " +
-              "the tablet has moved and this isn't a fault tolerant scan");
-          throw new NonRecoverableException(statusIncomplete);
-        }
+      // Error handling.
+      if (error != null) {
+        switch (error.getCode()) {
+          case TABLET_NOT_FOUND:
+          case TABLET_NOT_RUNNING:
+            if (state == State.OPENING || (state == State.NEXT && isFaultTolerant)) {
+              // Doing this will trigger finding the new location.
+              return new Pair<Response, Object>(null, error);
+            } else {
+              Status statusIncomplete = Status.Incomplete("Cannot continue scanning, " +
+                  "the tablet has moved and this isn't a fault tolerant scan");
+              throw new NonRecoverableException(statusIncomplete);
+            }
+          case SCANNER_EXPIRED:
+            if (isFaultTolerant) {
+              Status status = Status.fromTabletServerErrorPB(error);
+              throw new ScannerExpiredException(status);
+            }
+          default:
+            break;
+          }
       }
       RowResultIterator iterator = RowResultIterator.makeRowResultIterator(
           deadlineTracker.getElapsedMillis(), tsUUID, schema, resp.getData(),
@@ -826,7 +880,8 @@ public final class AsyncKuduScanner {
           resp.hasSnapTimestamp() ? resp.getSnapTimestamp()
                                   : AsyncKuduClient.NO_TIMESTAMP,
           resp.hasPropagatedTimestamp() ? resp.getPropagatedTimestamp()
-                                        : AsyncKuduClient.NO_TIMESTAMP);
+                                        : AsyncKuduClient.NO_TIMESTAMP,
+          resp.getLastPrimaryKey().toByteArray());
       if (LOG.isDebugEnabled()) {
         LOG.debug(response.toString());
       }
@@ -865,7 +920,7 @@ public final class AsyncKuduScanner {
      */
     public AsyncKuduScanner build() {
       return new AsyncKuduScanner(
-          client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
+          client, table, projectedColumnNames, projectedColumnIndexes, readMode, isFaultTolerant,
           scanRequestTimeout, predicates, limit, cacheBlocks,
           prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
           htTimestamp, batchSizeBytes, PartitionPruner.create(this), replicaSelection);

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index 2f3a93c..811317c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -224,6 +224,10 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
       builder.cacheBlocks(message.getCacheBlocks());
     }
 
+    if (message.hasFaultTolerant()) {
+      builder.setFaultTolerant(message.getFaultTolerant());
+    }
+
     return builder.build();
   }
 
@@ -326,6 +330,7 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
       }
 
       proto.setCacheBlocks(cacheBlocks);
+      proto.setFaultTolerant(isFaultTolerant);
 
       try {
         PartitionPruner pruner = PartitionPruner.create(this);

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 05fc8b9..85fe3b3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.kudu.Schema;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
@@ -110,6 +111,24 @@ public class KuduScanner {
   }
 
   /**
+   * Returns the RemoteTablet currently being scanned, if any.
+   */
+  @VisibleForTesting
+  RemoteTablet currentTablet() {
+    return asyncScanner.currentTablet();
+  }
+
+  /**
+   * Gets the replica selection mechanism being used.
+   *
+   * @return the replica selection mechanism
+   */
+  @VisibleForTesting
+  ReplicaSelection getReplicaSelection() {
+    return asyncScanner.getReplicaSelection();
+  }
+
+  /**
    * A Builder class to build {@link KuduScanner}.
    * Use {@link KuduClient#newScannerBuilder} in order to get a builder instance.
    */
@@ -128,7 +147,7 @@ public class KuduScanner {
      */
     public KuduScanner build() {
       return new KuduScanner(new AsyncKuduScanner(
-          client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
+          client, table, projectedColumnNames, projectedColumnIndexes, readMode, isFaultTolerant,
           scanRequestTimeout, predicates, limit, cacheBlocks,
           prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
           htTimestamp, batchSizeBytes, PartitionPruner.create(this), replicaSelection));

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
index 978bfd1..c0e44e6 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
@@ -46,7 +46,7 @@ public class RowResultIterator extends KuduRpcResponse implements Iterator<RowRe
 
   /**
    * Package private constructor, only meant to be instantiated from AsyncKuduScanner.
-   * @param ellapsedMillis ime in milliseconds since RPC creation to now
+   * @param ellapsedMillis time in milliseconds since RPC creation to now
    * @param tsUUID UUID of the tablet server that handled our request
    * @param schema schema used to parse the rows
    * @param numRows how many rows are contained in the bs slice

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/main/java/org/apache/kudu/client/ScannerExpiredException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ScannerExpiredException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ScannerExpiredException.java
new file mode 100644
index 0000000..46485d3
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ScannerExpiredException.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+
+/**
+ * A scanner expired exception.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+@SuppressWarnings("serial")
+class ScannerExpiredException extends RecoverableException {
+  /**
+   * Constructor.
+   * @param status status object containing the reason for the exception
+   * trace
+   */
+  ScannerExpiredException(Status status) {
+    super(status);
+  }
+
+  /**
+   * Constructor.
+   * @param status status object containing the reason for the exception
+   * @param cause the exception that caused this one to be thrown
+   */
+  ScannerExpiredException(Status status, Throwable cause) {
+    super(status, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index 24e80f8..6220633 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -509,16 +509,18 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
   private KuduException dispatchTSErrorOrReturnException(
       KuduRpc<?> rpc, Tserver.TabletServerErrorPB error,
       RpcTraceFrame.RpcTraceFrameBuilder traceBuilder) {
-    WireProtocol.AppStatusPB.ErrorCode code = error.getStatus().getCode();
+    Tserver.TabletServerErrorPB.Code errCode = error.getCode();
+    WireProtocol.AppStatusPB.ErrorCode errStatusCode = error.getStatus().getCode();
     Status status = Status.fromTabletServerErrorPB(error);
-    if (error.getCode() == Tserver.TabletServerErrorPB.Code.TABLET_NOT_FOUND) {
+    if (errCode == Tserver.TabletServerErrorPB.Code.TABLET_NOT_FOUND) {
       kuduClient.handleTabletNotFound(rpc, new RecoverableException(status), this);
       // we're not calling rpc.callback() so we rely on the client to retry that RPC
-    } else if (code == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
+    } else if (errCode == Tserver.TabletServerErrorPB.Code.TABLET_NOT_RUNNING ||
+        errStatusCode == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
       kuduClient.handleRetryableError(rpc, new RecoverableException(status));
       // The following two error codes are an indication that the tablet isn't a leader.
-    } else if (code == WireProtocol.AppStatusPB.ErrorCode.ILLEGAL_STATE ||
-        code == WireProtocol.AppStatusPB.ErrorCode.ABORTED) {
+    } else if (errStatusCode == WireProtocol.AppStatusPB.ErrorCode.ILLEGAL_STATE ||
+        errStatusCode == WireProtocol.AppStatusPB.ErrorCode.ABORTED) {
       kuduClient.handleNotLeader(rpc, new RecoverableException(status), this);
     } else {
       return new NonRecoverableException(status);

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 83d2492..190ba06 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -340,32 +340,58 @@ public class BaseKuduTest {
    * @throws Exception
    */
   protected static void killTabletLeader(KuduTable table) throws Exception {
+    List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
+    if (tablets.isEmpty() || tablets.size() > 1) {
+      fail("Currently only support killing leaders for tables containing 1 tablet, table " +
+      table.getName() + " has " + tablets.size());
+    }
+    LocatedTablet tablet = tablets.get(0);
+    if (tablet.getReplicas().size() == 1) {
+      fail("Table " + table.getName() + " only has 1 tablet, please enable replication");
+    }
+
+    Integer port = findLeaderTabletServerPort(tablet);
+    miniCluster.killTabletServerOnPort(port);
+  }
+
+  /**
+   * Helper method to kill a tablet server that serves the given tablet's
+   * leader. The currently running test case will be failed if the tablet has no
+   * leader after some retries, or if the tablet server was already killed.
+   *
+   * This method is thread-safe.
+   * @param tablet a RemoteTablet which will get its leader killed
+   * @throws Exception
+   */
+  protected static void killTabletLeader(RemoteTablet tablet) throws Exception {
+    int port = findLeaderTabletServerPort(new LocatedTablet(tablet));
+    miniCluster.killTabletServerOnPort(port);
+  }
+
+  /**
+   * Finds the RPC port of the given tablet's leader tablet server.
+   * @param tablet a LocatedTablet
+   * @return the RPC port of the given tablet's leader tablet server.
+   * @throws Exception
+   */
+  private static int findLeaderTabletServerPort(LocatedTablet tablet)
+      throws Exception {
     LocatedTablet.Replica leader = null;
     DeadlineTracker deadlineTracker = new DeadlineTracker();
     deadlineTracker.setDeadline(DEFAULT_SLEEP);
     while (leader == null) {
       if (deadlineTracker.timedOut()) {
-        fail("Timed out while trying to find a leader for this table: " + table.getName());
-      }
-      List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
-      if (tablets.isEmpty() || tablets.size() > 1) {
-        fail("Currently only support killing leaders for tables containing 1 tablet, table " +
-            table.getName() + " has " + tablets.size());
-      }
-      LocatedTablet tablet = tablets.get(0);
-      if (tablet.getReplicas().size() == 1) {
-        fail("Table " + table.getName() + " only has 1 tablet, please enable replication");
+        fail("Timed out while trying to find a leader for this table");
       }
+
       leader = tablet.getLeaderReplica();
       if (leader == null) {
         LOG.info("Sleeping while waiting for a tablet LEADER to arise, currently slept " +
-            deadlineTracker.getElapsedMillis() + "ms");
+        deadlineTracker.getElapsedMillis() + "ms");
         Thread.sleep(50);
       }
     }
-
-    Integer port = leader.getRpcPort();
-    miniCluster.killTabletServerOnPort(port);
+    return leader.getRpcPort();
   }
 
   /**
@@ -428,6 +454,22 @@ public class BaseKuduTest {
   }
 
   /**
+   * Kills a tablet server that serves the given tablet's leader and restarts it.
+   * Waits between killing and restarting the process.
+   *
+   * @param tablet a RemoteTablet which will get its leader killed and restarted
+   * @throws Exception
+   */
+  protected static void restartTabletServer(RemoteTablet tablet) throws Exception {
+    int port = findLeaderTabletServerPort(new LocatedTablet(tablet));
+    miniCluster.killTabletServerOnPort(port);
+
+    Thread.sleep(1000);
+
+    miniCluster.restartDeadTabletServerOnPort(port);
+  }
+
+  /**
    * Kills, sleeps, then restarts the leader master.
    * @throws Exception
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
new file mode 100644
index 0000000..b2941a3
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.junit.Test;
+
+/**
+ * Integration test on fault tolerant scanner that inserts enough data
+ * to trigger flushes and getting multiple data blocks.
+ */
+public class ITFaultTolerantScanner extends ITScannerMultiTablet {
+  /**
+   * Tests fault tolerant scanner by restarting the tablet server in the middle
+   * of tablet scanning and verifies the scan results are as expected.
+   */
+  @Test(timeout = 100000)
+  public void testFaultTolerantScannerRestart() throws Exception {
+    faultInjectionScanner(true, true, false);
+  }
+
+  /**
+   * Tests fault tolerant scanner by killing the tablet server in the middle
+   * of tablet scanning and verifies the scan results are as expected.
+   */
+  @Test(timeout = 100000)
+  public void testFaultTolerantScannerKill() throws Exception {
+    faultInjectionScanner(false, true, false);
+  }
+
+  /**
+   * Tests fault tolerant scanner by killing the tablet server while scanning
+   * (after finish scan of first tablet) and verifies the scan results are as expected.
+   */
+  @Test(timeout = 100000)
+  public void testFaultTolerantScannerKillFinishFirstTablet() throws Exception {
+    faultInjectionScanner(false, true, true);
+  }
+
+  /**
+   * Tests fault tolerant scanner by restarting the tablet server while scanning
+   * (after finish scan of first tablet) and verifies the scan results are as expected.
+   */
+  @Test(timeout = 100000)
+  public void testFaultTolerantScannerRestartFinishFirstTablet() throws Exception {
+    faultInjectionScanner(true, true, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
new file mode 100644
index 0000000..21ab0ad
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test on non fault tolerant scanner that inserts enough data
+ * to trigger flushes and getting multiple data blocks.
+ */
+public class ITNonFaultTolerantScanner extends ITScannerMultiTablet {
+
+  /**
+   * Test for KUDU-1343 with a multi-batch multi-tablet scan.
+   */
+  @Test(timeout = 100000)
+  public void testKudu1343() throws Exception {
+    KuduScanner scanner = syncClient.newScannerBuilder(table)
+    .batchSizeBytes(1) // Just a hint, won't actually be that small
+    .build();
+
+    int rowCount = 0;
+    int loopCount = 0;
+    while(scanner.hasMoreRows()) {
+      loopCount++;
+      RowResultIterator rri = scanner.nextRows();
+      rowCount += rri.getNumRows();
+    }
+
+    assertTrue(loopCount > TABLET_COUNT);
+    assertEquals(ROW_COUNT, rowCount);
+  }
+
+  /**
+   * Verifies for non fault tolerant scanner, it can proceed
+   * properly even if there is a disconnection.
+   */
+  @Test(timeout = 100000)
+  public void testNonFaultTolerantDisconnect() throws KuduException {
+    KuduScanner scanner = syncClient.newScannerBuilder(table)
+    .batchSizeBytes(1)
+    .build();
+
+    int rowCount = 0;
+    int loopCount = 0;
+    if (scanner.hasMoreRows()) {
+      loopCount++;
+      RowResultIterator rri = scanner.nextRows();
+      rowCount += rri.getNumRows();
+    }
+
+    // Forcefully shuts down the current connection and
+    // fails all outstanding RPCs in the middle of
+    // scanning.
+    client.closeCurrentConnection(scanner.currentTablet(),
+    scanner.getReplicaSelection());
+
+    while (scanner.hasMoreRows()) {
+      loopCount++;
+      RowResultIterator rri = scanner.nextRows();
+      rowCount += rri.getNumRows();
+    }
+
+    assertTrue(loopCount > TABLET_COUNT);
+    assertEquals(ROW_COUNT, rowCount);
+  }
+
+  /**
+   * Tests non fault tolerant scanner by killing the tablet server while scanning and
+   * verifies it throws {@link NonRecoverableException} as expected.
+   */
+  @Test(timeout = 100000, expected=NonRecoverableException.class)
+  public void testNonFaultTolerantScannerKill() throws Exception {
+    faultInjectionScanner(false, false, false);
+  }
+
+  /**
+   * Tests non fault tolerant scanner by restarting the tablet server while scanning and
+   * verifies it throws {@link NonRecoverableException} as expected.
+   */
+  @Test(timeout = 100000, expected=NonRecoverableException.class)
+  public void testNonFaultTolerantScannerRestart() throws Exception {
+    faultInjectionScanner(true, false, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/b36b84dd/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
index 63931e2..78b9721 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
@@ -17,13 +17,11 @@
 package org.apache.kudu.client;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.util.Random;
 
 import com.google.common.collect.Lists;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import org.apache.kudu.Schema;
 
@@ -35,11 +33,11 @@ public class ITScannerMultiTablet extends BaseKuduTest {
 
   private static final String TABLE_NAME =
       ITScannerMultiTablet.class.getName()+"-"+System.currentTimeMillis();
-  private static final int ROW_COUNT = 20000;
-  private static final int TABLET_COUNT = 3;
+  protected static final int ROW_COUNT = 20000;
+  protected static final int TABLET_COUNT = 3;
 
   private static Schema schema = getBasicSchema();
-  private static KuduTable table;
+  protected static KuduTable table;
 
   private static Random random = new Random(1234);
 
@@ -75,55 +73,72 @@ public class ITScannerMultiTablet extends BaseKuduTest {
   }
 
   /**
-   * Test for KUDU-1343 with a multi-batch multi-tablet scan.
+   * Injecting failures (kill or restart TabletServer) while scanning, to verify:
+   * fault tolerant scanner will continue scan and non-fault tolerant scanner will throw
+   * {@link NonRecoverableException}.
+   *
+   * Also makes sure we pass all the correct information down to the server by verifying
+   * we get rows in order from 3 tablets. We detect those tablet boundaries when keys suddenly
+   * become smaller than what was previously seen.
+   *
+   * @param shouldRestart if true restarts TabletServer, otherwise kills TabletServer
+   * @param isFaultTolerant if true uses fault tolerant scanner, otherwise
+   *                        uses non fault-tolerant one
+   * @param finishFirstScan if true injects failure before finishing first tablet scan,
+   *                        otherwise in the middle of tablet scanning
+   * @throws Exception
    */
-  @Test(timeout = 100000)
-  public void testKudu1343() throws Exception {
+  void faultInjectionScanner(boolean shouldRestart, boolean isFaultTolerant,
+      boolean finishFirstScan) throws Exception {
     KuduScanner scanner = syncClient.newScannerBuilder(table)
-        .batchSizeBytes(1) // Just a hint, won't actually be that small
-        .build();
+        .setFaultTolerant(isFaultTolerant)
+        .batchSizeBytes(1)
+        .setProjectedColumnIndexes(Lists.newArrayList(0)).build();
 
     int rowCount = 0;
-    int loopCount = 0;
-    while(scanner.hasMoreRows()) {
-      loopCount++;
+    int previousRow = -1;
+    int tableBoundariesCount = 0;
+    if (scanner.hasMoreRows()) {
       RowResultIterator rri = scanner.nextRows();
       while (rri.hasNext()) {
-        rri.next();
+        int key = rri.next().getInt(0);
+        if (key < previousRow) {
+          tableBoundariesCount++;
+        }
+        previousRow = key;
         rowCount++;
       }
     }
 
-    assertTrue(loopCount > TABLET_COUNT);
-    assertEquals(ROW_COUNT, rowCount);
-  }
-
-  /**
-   * Makes sure we pass all the correct information down to the server by verifying we get rows in
-   * order from 4 tablets. We detect those tablet boundaries when keys suddenly become smaller than
-   * what was previously seen.
-   */
-  @Test(timeout = 100000)
-  public void testSortResultsByPrimaryKey() throws Exception {
-    KuduScanner scanner = syncClient.newScannerBuilder(table)
-        .sortResultsByPrimaryKey()
-        .setProjectedColumnIndexes(Lists.newArrayList(0))
-        .build();
+    if (!finishFirstScan) {
+      if (shouldRestart) {
+        restartTabletServer(scanner.currentTablet());
+      } else {
+        killTabletLeader(scanner.currentTablet());
+      }
+    }
 
-    int rowCount = 0;
-    int previousRow = -1;
-    int tableBoundariesCount = 0;
-    while(scanner.hasMoreRows()) {
+    boolean failureInjected = false;
+    while (scanner.hasMoreRows()) {
       RowResultIterator rri = scanner.nextRows();
       while (rri.hasNext()) {
         int key = rri.next().getInt(0);
         if (key < previousRow) {
           tableBoundariesCount++;
+          if (finishFirstScan && !failureInjected) {
+            if (shouldRestart) {
+              restartTabletServer(scanner.currentTablet());
+            } else {
+              killTabletLeader(scanner.currentTablet());
+            }
+            failureInjected = true;
+          }
         }
         previousRow = key;
         rowCount++;
       }
     }
+
     assertEquals(ROW_COUNT, rowCount);
     assertEquals(TABLET_COUNT, tableBoundariesCount);
   }