You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/04/28 04:32:38 UTC

[2/5] incubator-kudu git commit: KUDU-1364. Don't clear the cache when a server disconnect

KUDU-1364. Don't clear the cache when a server disconnect

Folks seem to agree that, in the Java client, clearing the cache when a
server gets disconnected (for any reason) is not a good idea.

This patch does major surgery on how we handle cached TabletClients.
We now always keep them in RemoteTablet, and do our best to remember
if they were a leader. Only the ip2client is being removed from. We
now rely on the TabletClient's 'dead' attribute to know if a
connection is stale and needs to be re-established.

Not clear if client2tablets really needs to be removed from, and when.
We'll probably only remove from it in the future when we support
dropping ranges.

The patch also introduces a new integration test for the client. It's
still super basic but gives more confidence in the change.

It also fixes a dumb bug that was killing scanners too soon if there
was a disconnection.

Change-Id: I8606bfcedb09af57b66ba0f065067f0f3335a4a8
Reviewed-on: http://gerrit.cloudera.org:8080/2449
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/c4180bb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/c4180bb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/c4180bb0

Branch: refs/heads/master
Commit: c4180bb03bf3425756ea09966f004153a8a31f72
Parents: 3944122
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Thu Mar 3 20:58:38 2016 -0800
Committer: Jean-Daniel Cryans <jd...@gerrit.cloudera.org>
Committed: Thu Apr 28 01:48:53 2016 +0000

----------------------------------------------------------------------
 .../java/org/kududb/client/AsyncKuduClient.java | 188 ++++++---
 .../org/kududb/client/AsyncKuduScanner.java     |   2 +
 .../java/org/kududb/client/TabletClient.java    |  51 ++-
 .../test/java/org/kududb/client/ITClient.java   | 396 +++++++++++++++++++
 .../org/kududb/client/TestAsyncKuduClient.java  |  20 +-
 5 files changed, 591 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c4180bb0/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
index d7c5b4d..6482996 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
@@ -582,12 +582,12 @@ public class AsyncKuduClient implements AutoCloseable {
     final TabletClient client = clientFor(tablet);
     final KuduRpc<AsyncKuduScanner.Response> next_request = scanner.getNextRowsRequest();
     final Deferred<AsyncKuduScanner.Response> d = next_request.getDeferred();
-    if (client == null) {
-      // Oops, we no longer know anything about this client or tabletSlice.  Our
-      // cache was probably invalidated while the client was scanning.  This
-      // means that we lost the connection to that TabletServer, so we have to
-      // try to re-connect and check if the scanner is still good.
-      return sendRpcToTablet(next_request);
+    if (client == null || !client.isAlive()) {
+      // A null client means we either don't know about this tablet anymore (unlikely) or we
+      // couldn't find a leader (which could be triggered by a read timeout).
+      // We'll first delay the RPC in case things take some time to settle down, then retry.
+      delayedSendRpcToTablet(next_request, null);
+      return next_request.getDeferred();
     }
     next_request.attempt++;
     client.sendRpc(next_request);
@@ -596,8 +596,8 @@ public class AsyncKuduClient implements AutoCloseable {
 
   /**
    * Package-private access point for {@link AsyncKuduScanner}s to close themselves.
-   * @param scanner The scanner to close.
-   * @return A deferred object that indicates the completion of the request.
+   * @param scanner the scanner to close
+   * @return a deferred object that indicates the completion of the request.
    * The {@link AsyncKuduScanner.Response} can contain rows that were left to scan.
    */
   Deferred<AsyncKuduScanner.Response> closeScanner(final AsyncKuduScanner scanner) {
@@ -608,9 +608,9 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     final TabletClient client = clientFor(tablet);
-    if (client == null) {
-      // Oops, we no longer know anything about this client or tabletSlice.  Our
-      // cache was probably invalidated while the client was scanning.  So
+    if (client == null || !client.isAlive()) {
+      // Oops, we couldn't find a tablet server that hosts this tablet. Our
+      // cache was probably invalidated while the client was scanning. So
       // we can't close this scanner properly.
       LOG.warn("Cannot close " + scanner + " properly, no connection open for "
           + (tablet == null ? null : tablet));
@@ -643,13 +643,45 @@ public class AsyncKuduClient implements AutoCloseable {
       request.setPropagatedTimestamp(lastPropagatedTs);
     }
 
+    // If we found a tablet, we'll try to find the TS to talk to. If that TS was previously
+    // disconnected, say because we didn't query that tablet for some seconds, then we'll try to
+    // reconnect based on the old information. If that fails, we'll instead continue with the next
+    // block that queries the master.
     if (tablet != null) {
       TabletClient tabletClient = clientFor(tablet);
       if (tabletClient != null) {
-        request.setTablet(tablet);
         final Deferred<R> d = request.getDeferred();
-        tabletClient.sendRpc(request);
-        return d;
+        if (tabletClient.isAlive()) {
+          request.setTablet(tablet);
+          tabletClient.sendRpc(request);
+          return d;
+        }
+        try {
+          tablet.reconnectTabletClient(tabletClient);
+        } catch (UnknownHostException e) {
+          LOG.error("Cached tablet server {}'s host cannot be resolved, will query the master",
+              tabletClient.getUuid(), e);
+          // Because of this exception, clientFor() below won't be able to find a newTabletClient
+          // and we'll delay the RPC.
+        }
+        TabletClient newTabletClient = clientFor(tablet);
+        assert (tabletClient != newTabletClient);
+
+        if (newTabletClient == null) {
+          // Wait a little bit before hitting the master.
+          delayedSendRpcToTablet(request, null);
+          return request.getDeferred();
+        }
+
+        if (!newTabletClient.isAlive()) {
+          LOG.debug("Tried reconnecting to tablet server {} but failed, " +
+              "will query the master", tabletClient.getUuid());
+          // Let fall through.
+        } else {
+          request.setTablet(tablet);
+          newTabletClient.sendRpc(request);
+          return d;
+        }
       }
     }
 
@@ -687,6 +719,7 @@ public class AsyncKuduClient implements AutoCloseable {
       this.request = request;
     }
     public Deferred<R> call(final D arg) {
+      LOG.debug("Retrying sending RPC {} after lookup", request);
       return sendRpcToTablet(request);  // Retry the RPC.
     }
     public String toString() {
@@ -937,6 +970,7 @@ public class AsyncKuduClient implements AutoCloseable {
     }
     final Exception e = new NonRecoverableException(message + request, cause);
     request.errback(e);
+    LOG.debug("Cannot continue with this RPC: {} because of: {}", request, message, e);
     return Deferred.fromError(e);
   }
 
@@ -1142,7 +1176,7 @@ public class AsyncKuduClient implements AutoCloseable {
   private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
     LOG.info("Removing server " + server.getUuid() + " from this tablet's cache " +
         tablet.getTabletIdAsString());
-    tablet.removeTabletServer(server);
+    tablet.removeTabletClient(server);
   }
 
   /** Callback executed when a master lookup completes.  */
@@ -1218,7 +1252,7 @@ public class AsyncKuduClient implements AutoCloseable {
       // If we already know about this one, just refresh the locations
       RemoteTablet currentTablet = tablet2client.get(tabletId);
       if (currentTablet != null) {
-        currentTablet.refreshServers(tabletPb);
+        currentTablet.refreshTabletClients(tabletPb);
         continue;
       }
 
@@ -1231,7 +1265,7 @@ public class AsyncKuduClient implements AutoCloseable {
       }
       LOG.info("Discovered tablet {} for table {} with partition {}",
                tabletId.toString(Charset.defaultCharset()), tableName, rt.getPartition());
-      rt.refreshServers(tabletPb);
+      rt.refreshTabletClients(tabletPb);
       // This is making this tablet available
       // Even if two clients were racing in this method they are putting the same RemoteTablet
       // with the same start key in the CSLM in the end
@@ -1328,7 +1362,7 @@ public class AsyncKuduClient implements AutoCloseable {
         return client;
       }
       final TabletClientPipeline pipeline = new TabletClientPipeline();
-      client = pipeline.init(uuid);
+      client = pipeline.init(uuid, host, port);
       chan = channelFactory.newChannel(pipeline);
       ip2client.put(hostport, client);  // This is guaranteed to return null.
     }
@@ -1464,7 +1498,7 @@ public class AsyncKuduClient implements AutoCloseable {
           public ArrayList<Void> call(final ArrayList<Void> arg) {
             // Normally, now that we've shutdown() every client, all our caches should
             // be empty since each shutdown() generates a DISCONNECTED event, which
-            // causes TabletClientPipeline to call removeClientFromCache().
+            // causes TabletClientPipeline to call removeClientFromIpCache().
             HashMap<String, TabletClient> logme = null;
             synchronized (ip2client) {
               if (!ip2client.isEmpty()) {
@@ -1542,12 +1576,12 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Removes all the cache entries referred to the given client.
-   * @param client The client for which we must invalidate everything.
-   * @param remote The address of the remote peer, if known, or null.
+   * Removes the given client from the `ip2client` cache.
+   * @param client The client for which we must clear the ip cache
+   * @param remote The address of the remote peer, if known, or null
    */
-  private void removeClientFromCache(final TabletClient client,
-                                     final SocketAddress remote) {
+  private void removeClientFromIpCache(final TabletClient client,
+                                       final SocketAddress remote) {
 
     if (remote == null) {
       return;  // Can't continue without knowing the remote address.
@@ -1584,19 +1618,24 @@ public class AsyncKuduClient implements AutoCloseable {
           + hostport + "), it was found that there was no entry"
           + " corresponding to " + remote + ".  This shouldn't happen.");
     }
+  }
 
-    ArrayList<RemoteTablet> tablets = client2tablets.remove(client);
+  /**
+   * Call this method after encountering an error connecting to a tablet server so that we stop
+   * considering it a leader for the tablets it serves.
+   * @param client tablet server to use for demotion
+   */
+  void demoteAsLeaderForAllTablets(final TabletClient client) {
+    ArrayList<RemoteTablet> tablets = client2tablets.get(client);
     if (tablets != null) {
       // Make a copy so we don't need to synchronize on it while iterating.
       RemoteTablet[] tablets_copy;
       synchronized (tablets) {
         tablets_copy = tablets.toArray(new RemoteTablet[tablets.size()]);
-        tablets = null;
-        // If any other thread still has a reference to `tablets', their
-        // updates will be lost (and we don't care).
       }
       for (final RemoteTablet remoteTablet : tablets_copy) {
-        remoteTablet.removeTabletServer(client);
+        // It will be a no-op if it's not already a leader.
+        remoteTablet.demoteLeader(client);
       }
     }
   }
@@ -1620,8 +1659,8 @@ public class AsyncKuduClient implements AutoCloseable {
      */
     private boolean disconnected = false;
 
-    TabletClient init(String uuid) {
-      final TabletClient client = new TabletClient(AsyncKuduClient.this, uuid);
+    TabletClient init(String uuid, String host, int port) {
+      final TabletClient client = new TabletClient(AsyncKuduClient.this, uuid, host, port);
       if (defaultSocketReadTimeoutMs > 0) {
         super.addLast("timeout-handler",
             new ReadTimeoutHandler(timer,
@@ -1679,10 +1718,8 @@ public class AsyncKuduClient implements AutoCloseable {
           remote = slowSearchClientIP(client);
         }
 
-        // Prevent the client from buffering requests while we invalidate
-        // everything we have about it.
         synchronized (client) {
-          removeClientFromCache(client, remote);
+          removeClientFromIpCache(client, remote);
         }
       } catch (Exception e) {
         log.error("Uncaught exception when handling a disconnection of " + getChannel(), e);
@@ -1766,7 +1803,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * succeeds.
    *
    * Subtleties:
-   * We don't keep track of a TS after it disconnects (via removeTabletServer), so if we
+   * We don't keep track of a TS after it disconnects (via removeTabletClient), so if we
    * haven't contacted one for 10 seconds (socket timeout), it will be removed from the list of
    * tabletServers. This means that if the leader fails, we only have one other TS to "promote"
    * or maybe none at all. This is partly why we then set leaderIndex to NO_LEADER_INDEX.
@@ -1793,7 +1830,7 @@ public class AsyncKuduClient implements AutoCloseable {
       this.partition = partition;
     }
 
-    void refreshServers(Master.TabletLocationsPB tabletLocations) throws NonRecoverableException {
+    void refreshTabletClients(Master.TabletLocationsPB tabletLocations) throws NonRecoverableException {
 
       synchronized (tabletServers) { // TODO not a fat lock with IP resolving in it
         tabletServers.clear();
@@ -1820,9 +1857,9 @@ public class AsyncKuduClient implements AutoCloseable {
             lookupExceptions.add(ex);
           }
         }
-        leaderIndex = 0;
+
         if (leaderIndex == NO_LEADER_INDEX) {
-          LOG.warn("No leader provided for tablet " + getTabletIdAsString());
+          LOG.warn("No leader provided for tablet {}", getTabletIdAsString());
         }
 
         // If we found a tablet that doesn't contain a single location that we can resolve, there's
@@ -1846,19 +1883,48 @@ public class AsyncKuduClient implements AutoCloseable {
 
       final ArrayList<RemoteTablet> tablets = client2tablets.get(client);
 
-      if (tablets == null) {
-        // We raced with removeClientFromCache and lost. The client we got was just disconnected.
-        // Reconnect.
-        addTabletClient(uuid, host, port, isLeader);
-      } else {
-        synchronized (tablets) {
-          if (isLeader) {
-            tabletServers.add(0, client);
-          } else {
-            tabletServers.add(client);
-          }
-          tablets.add(this);
+      synchronized (tablets) {
+        tabletServers.add(client);
+        if (isLeader) {
+          leaderIndex = tabletServers.size() - 1;
         }
+        tablets.add(this);
+      }
+    }
+
+    /**
+     * Call this method when an existing TabletClient in this tablet's cache is found to be dead.
+     * It removes the passed TS from this tablet's cache and replaces it with a new instance of
+     * TabletClient. It will keep its leader status if it was already considered a leader.
+     * If the passed TabletClient was already removed, then this is a no-op.
+     * @param staleTs TS to reconnect to
+     * @throws UnknownHostException if we can't resolve server's hostname
+     */
+    void reconnectTabletClient(TabletClient staleTs) throws UnknownHostException {
+      assert (!staleTs.isAlive());
+
+      synchronized (tabletServers) {
+        int index = tabletServers.indexOf(staleTs);
+
+        if (index == -1) {
+          // Another thread already took care of it.
+          return;
+        }
+
+        boolean wasLeader = index == leaderIndex;
+
+        LOG.debug("Reconnecting to server {} for tablet {}. Was a leader? {}",
+            staleTs.getUuid(), getTabletIdAsString(), wasLeader);
+
+        boolean removed = removeTabletClient(staleTs);
+
+        if (!removed) {
+          LOG.debug("{} was already removed from tablet {}'s cache when reconnecting to it",
+              staleTs.getUuid(), getTabletIdAsString());
+        }
+
+        addTabletClient(staleTs.getUuid(), staleTs.getHost(),
+            staleTs.getPort(), wasLeader);
       }
     }
 
@@ -1873,7 +1939,7 @@ public class AsyncKuduClient implements AutoCloseable {
      * @param ts A TabletClient that was disconnected.
      * @return True if this method removed ts from the list, else false.
      */
-    boolean removeTabletServer(TabletClient ts) {
+    boolean removeTabletClient(TabletClient ts) {
       synchronized (tabletServers) {
         // TODO unit test for this once we have the infra
         int index = tabletServers.indexOf(ts);
@@ -1894,10 +1960,10 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     /**
-     * If the passed TabletClient is the current leader, then the next one in the list will be
-     * "promoted" unless we're at the end of the list, in which case we set the leaderIndex to
-     * NO_LEADER_INDEX which will force a call to the master.
-     * @param ts A TabletClient that gave a sign that it isn't this tablet's leader.
+     * Clears the leader index if the passed tablet server is the current leader.
+     * If it is the current leader, then the next call to this tablet will have
+     * to query the master to find the new leader.
+     * @param ts a TabletClient that gave a sign that it isn't this tablet's leader
      */
     void demoteLeader(TabletClient ts) {
       synchronized (tabletServers) {
@@ -1905,15 +1971,17 @@ public class AsyncKuduClient implements AutoCloseable {
         // If this TS was removed or we're already forcing a call to the master (meaning someone
         // else beat us to it), then we just noop.
         if (index == -1 || leaderIndex == NO_LEADER_INDEX) {
+          LOG.debug("{} couldn't be demoted as the leader for {}",
+              ts.getUuid(), getTabletIdAsString());
           return;
         }
 
         if (leaderIndex == index) {
-          if (leaderIndex + 1 == tabletServers.size()) {
-            leaderIndex = NO_LEADER_INDEX;
-          } else {
-            leaderIndex++;
-          }
+          leaderIndex = NO_LEADER_INDEX;
+          LOG.debug("{} was demoted as the leader for {}", ts.getUuid(), getTabletIdAsString());
+        } else {
+          LOG.debug("{} wasn't the leader for {}, current leader is at index {}", ts.getUuid(),
+              getTabletIdAsString(), leaderIndex);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c4180bb0/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
index cabca8d..80c4f67 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
@@ -743,11 +743,13 @@ public final class AsyncKuduScanner {
                  .setBatchSizeBytes(batchSizeBytes);
           break;
         case NEXT:
+          setTablet(AsyncKuduScanner.this.tablet);
           builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
                  .setCallSeqId(sequenceId)
                  .setBatchSizeBytes(batchSizeBytes);
           break;
         case CLOSING:
+          setTablet(AsyncKuduScanner.this.tablet);
           builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
                  .setBatchSizeBytes(0)
                  .setCloseScanner(true);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c4180bb0/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
index e18eb55..3f91ece 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
@@ -26,8 +26,7 @@
 
 package org.kududb.client;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import com.google.common.annotations.VisibleForTesting;
 import com.stumbleupon.async.Deferred;
 
 import org.jboss.netty.handler.timeout.ReadTimeoutException;
@@ -136,14 +135,20 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
 
   private final String uuid;
 
+  private final String host;
+
+  private final int port;
+
   private final long socketReadTimeoutMs;
 
   private SecureRpcHelper secureRpcHelper;
 
-  public TabletClient(AsyncKuduClient client, String uuid) {
+  public TabletClient(AsyncKuduClient client, String uuid, String host, int port) {
     this.kuduClient = client;
     this.uuid = uuid;
     this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
+    this.host = host;
+    this.port = port;
   }
 
   <R> void sendRpc(KuduRpc<R> rpc) {
@@ -252,6 +257,22 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     return payload;
   }
 
+  /**
+   * Quick and dirty way to close a connection to a tablet server, if it wasn't already closed.
+   */
+  @VisibleForTesting
+  void disconnect() {
+    Channel chancopy = chan;
+    if (chancopy != null && chancopy.isConnected()) {
+      Channels.disconnect(chancopy);
+    }
+  }
+
+  /**
+   * Forcefully shuts down the connection to this tablet server and fails all the outstanding RPCs.
+   * Only use when shutting down a client.
+   * @return deferred object to use to track the shutting down of this connection
+   */
   public Deferred<Void> shutdown() {
     // First, check whether we have RPCs in flight and cancel them.
     for (Iterator<KuduRpc<?>> ite = rpcs_inflight.values().iterator(); ite
@@ -667,10 +688,12 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
   private void failOrRetryRpc(final KuduRpc<?> rpc,
                               final ConnectionResetException exception) {
     AsyncKuduClient.RemoteTablet tablet = rpc.getTablet();
+    // Note As of the time of writing (03/11/16), a null tablet doesn't make sense, if we see a null
+    // tablet it's because we didn't set it properly before calling sendRpc().
     if (tablet == null) {  // Can't retry, dunno where this RPC should go.
       rpc.errback(exception);
     } else {
-      kuduClient.handleTabletNotFound(rpc, exception, this);
+      kuduClient.handleRetryableError(rpc, exception);
     }
   }
 
@@ -691,6 +714,9 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
       cleanup(c);
     } else {
       LOG.error(getPeerUuidLoggingString() + "Unexpected exception from downstream on " + c, e);
+      // For any other exception, likely a connection error, we clear the leader state
+      // for those tablets that this TS is the cached leader of.
+      kuduClient.demoteAsLeaderForAllTablets(this);
     }
     if (c.isOpen()) {
       Channels.close(c);  // Will trigger channelClosed(), which will cleanup()
@@ -757,6 +783,23 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     return uuid;
   }
 
+  /**
+   * Returns this tablet server's port.
+   * @return a port number that this tablet server is bound to
+   */
+  int getPort() {
+    return port;
+  }
+
+  /**
+   * Returns this tablet server's hostname. We might get many hostnames from the master for a single
+   * TS, and this is the one we picked to connect to originally.
+   * @returna string that contains this tablet server's hostname
+   */
+  String getHost() {
+    return host;
+  }
+
   public String toString() {
     final StringBuilder buf = new StringBuilder(13 + 10 + 6 + 64 + 7 + 32 + 16 + 1 + 17 + 2 + 1);
     buf.append("TabletClient@")           // =13

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c4180bb0/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/ITClient.java b/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
new file mode 100644
index 0000000..37d513e
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
@@ -0,0 +1,396 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test for the client. RPCs are sent to Kudu from multiple threads while processes
+ * are restarted and failures are injected.
+ *
+ * By default this test runs for 60 seconds, but this can be changed by passing a different value
+ * in "itclient.runtime.seconds". For example:
+ * "mvn test -Dtest=ITClient -Ditclient.runtime.seconds=120".
+ */
+public class ITClient extends BaseKuduTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ITClient.class);
+
+  private static final String RUNTIME_PROPERTY_NAME = "itclient.runtime.seconds";
+  private static final long DEFAULT_RUNTIME_SECONDS = 60;
+  // Time we'll spend waiting at the end of the test for things to settle. Also the minimum this
+  // test can run for.
+  private static final long TEST_MIN_RUNTIME_SECONDS = 2;
+  private static final long TEST_TIMEOUT_SECONDS = 600000;
+
+  private static final String TABLE_NAME =
+      ITClient.class.getName() + "-" + System.currentTimeMillis();
+  // One error and we stop the test.
+  private static final CountDownLatch KEEP_RUNNING_LATCH = new CountDownLatch(1);
+  // Latch used to track if an error occurred and we need to stop the test early.
+  private static final CountDownLatch ERROR_LATCH = new CountDownLatch(1);
+
+  private static KuduClient localClient;
+  private static AsyncKuduClient localAsyncClient;
+  private static KuduTable table;
+  private static long runtimeInSeconds;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    String runtimeProp = System.getProperty(RUNTIME_PROPERTY_NAME);
+    runtimeInSeconds = runtimeProp == null ? DEFAULT_RUNTIME_SECONDS : Long.parseLong(runtimeProp);
+
+    if (runtimeInSeconds < TEST_MIN_RUNTIME_SECONDS || runtimeInSeconds > TEST_TIMEOUT_SECONDS) {
+      Assert.fail("This test needs to run more more than " + TEST_MIN_RUNTIME_SECONDS + " seconds" +
+          " and less than " + TEST_TIMEOUT_SECONDS + " seconds");
+    }
+
+    LOG.info ("Test running for {} seconds", runtimeInSeconds);
+
+    BaseKuduTest.setUpBeforeClass();
+
+    // Client we're using has low tolerance for read timeouts but a
+    // higher overall operation timeout.
+    localAsyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(masterAddresses)
+        .defaultSocketReadTimeoutMs(500)
+        .defaultOperationTimeoutMs(20000)
+        .build();
+    localClient = new KuduClient(localAsyncClient);
+
+    CreateTableOptions builder = new CreateTableOptions().setNumReplicas(3);
+    table = localClient.createTable(TABLE_NAME, basicSchema, builder);
+  }
+
+  @Test(timeout = TEST_TIMEOUT_SECONDS)
+  public void test() throws Exception {
+    ArrayList<Thread> threads = new ArrayList<>();
+    Thread chaosThread = new Thread(new ChaosThread());
+    Thread writerThread = new Thread(new WriterThread());
+    Thread scannerThread = new Thread(new ScannerThread());
+
+    threads.add(chaosThread);
+    threads.add(writerThread);
+    threads.add(scannerThread);
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    // await() returns yes if the latch reaches 0, we don't want that.
+    Assert.assertFalse("Look for the last ERROR line in the log that comes from ITCLient",
+        ERROR_LATCH.await(runtimeInSeconds, TimeUnit.SECONDS));
+
+    // Indicate we want to stop, then wait a little bit for it to happen.
+    KEEP_RUNNING_LATCH.countDown();
+
+    for (Thread thread : threads) {
+      thread.interrupt();
+      thread.join();
+    }
+
+    AsyncKuduScanner scannerBuilder = localAsyncClient.newScannerBuilder(table).build();
+    int rowCount = countRowsInScan(scannerBuilder);
+    Assert.assertTrue(rowCount + " should be higher than 0", rowCount > 0);
+  }
+
+  /**
+   * Logs an error message and triggers the error count down latch, stopping this test.
+   * @param message error message to print
+   * @param exception optional exception to print
+   */
+  private void reportError(String message, Exception exception) {
+    LOG.error(message, exception);
+    ERROR_LATCH.countDown();
+  }
+
+  /**
+   * Thread that introduces chaos in the cluster, one at a time.
+   */
+  class ChaosThread implements Runnable {
+
+    private final Random random = new Random();
+
+    @Override
+    public void run() {
+      try {
+        KEEP_RUNNING_LATCH.await(2, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        return;
+      }
+      while (KEEP_RUNNING_LATCH.getCount() > 0) {
+        try {
+          boolean shouldContinue;
+          if (System.currentTimeMillis() % 2 == 0) {
+            shouldContinue = restartTS();
+          } else {
+
+            shouldContinue = disconnectNode();
+          }
+          // TODO restarting the master currently finds more bugs. Also, adding it to the list makes
+          // it necessary to find a new weighing mechanism betweent he different chaos options.
+          // shouldContinue = restartMaster();
+
+          if (!shouldContinue) {
+            return;
+          }
+          KEEP_RUNNING_LATCH.await(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          return;
+        }
+
+      }
+    }
+
+    /**
+     * Failure injection. Picks a random tablet server from the client's cache and force
+     * disconects it.
+     * @return true if successfully completed or didn't find a server to disconnect, false it it
+     * encountered a failure
+     */
+    private boolean disconnectNode() {
+      try {
+        if (localAsyncClient.getTableClients().size() == 0) {
+          return true;
+        }
+
+        int tsToDisconnect = random.nextInt(localAsyncClient.getTableClients().size());
+        localAsyncClient.getTableClients().get(tsToDisconnect).disconnect();
+
+      } catch (Exception e) {
+        if (KEEP_RUNNING_LATCH.getCount() == 0) {
+          // Likely shutdown() related.
+          return false;
+        }
+        reportError("Couldn't disconnect a TS", e);
+        return false;
+      }
+      return true;
+    }
+
+    /**
+     * Forces the restart of a random tablet server.
+     * @return true if it successfully completed, false if it failed
+     */
+    private boolean restartTS() {
+      try {
+        BaseKuduTest.restartTabletServer(table);
+      } catch (Exception e) {
+        reportError("Couldn't restart a TS", e);
+        return false;
+      }
+      return true;
+    }
+
+    /**
+     * Forces the restart of the master.
+     * @return true if it successfully completed, false if it failed
+     */
+    private boolean restartMaster() {
+      try {
+        BaseKuduTest.restartLeaderMaster();
+      } catch (Exception e) {
+        reportError("Couldn't restart a master", e);
+        return false;
+      }
+      return true;
+    }
+
+  }
+
+  /**
+   * Thread that writes sequentially to the table. Every 10 rows it considers setting the flush mode
+   * to MANUAL_FLUSH or AUTO_FLUSH_SYNC.
+   */
+  class WriterThread implements Runnable {
+
+    private final KuduSession session = localClient.newSession();
+    private final Random random = new Random();
+    private int currentRowKey = 0;
+
+    @Override
+    public void run() {
+      session.setIgnoreAllDuplicateRows(true);
+      while (KEEP_RUNNING_LATCH.getCount() > 0) {
+        try {
+          OperationResponse resp = session.apply(createBasicSchemaInsert(table, currentRowKey));
+          if (hasRowErrorAndReport(resp)) {
+            return;
+          }
+          currentRowKey++;
+
+          // Every 10 rows we flush and change the flush mode randomly.
+          if (currentRowKey % 10 == 0) {
+
+            // First flush any accumulated rows before switching.
+            List<OperationResponse> responses = session.flush();
+            if (responses != null) {
+              for (OperationResponse batchedResp : responses) {
+                if (hasRowErrorAndReport(batchedResp)) {
+                  return;
+                }
+              }
+            }
+
+            if (random.nextBoolean()) {
+              session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+            } else {
+              session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+            }
+          }
+        } catch (Exception e) {
+          if (KEEP_RUNNING_LATCH.getCount() == 0) {
+            // Likely shutdown() related.
+            return;
+          }
+          reportError("Got error while inserting row " + currentRowKey, e);
+          return;
+        }
+      }
+    }
+
+    private boolean hasRowErrorAndReport(OperationResponse resp) {
+      if (resp != null && resp.hasRowError()) {
+        reportError("The following RPC " + resp.getOperation().getRow() +
+            " returned this error: " + resp.getRowError(), null);
+        return true;
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Thread that scans the table. Alternates randomly between random gets and full table scans.
+   */
+  class ScannerThread implements Runnable {
+
+    private final Random random = new Random();
+
+    // Updated by calling a full scan.
+    private int lastRowCount = 0;
+
+    @Override
+    public void run() {
+      while (KEEP_RUNNING_LATCH.getCount() > 0) {
+
+        boolean shouldContinue;
+
+        // Always scan until we find rows.
+        if (lastRowCount == 0 || random.nextBoolean()) {
+          shouldContinue = fullScan();
+        } else {
+          shouldContinue = randomGet();
+        }
+
+        if (!shouldContinue) {
+          return;
+        }
+
+        if (lastRowCount == 0) {
+          try {
+            KEEP_RUNNING_LATCH.await(50, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException e) {
+            // Test is stopping.
+            return;
+          }
+        }
+      }
+    }
+
+    /**
+     * Reads a row at random that it knows to exist (smaller than lastRowCount).
+     * @return
+     */
+    private boolean randomGet() {
+      int key = random.nextInt(lastRowCount);
+      KuduPredicate predicate = KuduPredicate.newComparisonPredicate(
+          table.getSchema().getColumnByIndex(0), KuduPredicate.ComparisonOp.EQUAL, key);
+      KuduScanner scanner = localClient.newScannerBuilder(table).addPredicate(predicate).build();
+
+      List<RowResult> results = new ArrayList<>();
+      while (scanner.hasMoreRows()) {
+        try {
+          RowResultIterator ite = scanner.nextRows();
+          for (RowResult row : ite) {
+            results.add(row);
+          }
+        } catch (Exception e) {
+          return checkAndReportError("Got error while getting row " + key, e);
+        }
+      }
+
+      if (results.isEmpty() || results.size() > 1) {
+        reportError("Random get got 0 or many rows " + results.size() + " for key " + key, null);
+        return false;
+      }
+
+      int receivedKey = results.get(0).getInt(0);
+      if (receivedKey != key) {
+        reportError("Tried to get key " + key + " and received " + receivedKey, null);
+        return false;
+      }
+      return true;
+    }
+
+    /**
+     * Rusn a full table scan and updates the lastRowCount.
+     * @return
+     */
+    private boolean fullScan() {
+      AsyncKuduScanner scannerBuilder = localAsyncClient.newScannerBuilder(table).build();
+      try {
+        int rowCount = countRowsInScan(scannerBuilder);
+        if (rowCount < lastRowCount) {
+          reportError("Row count regressed: " + rowCount + " < " + lastRowCount, null);
+          return false;
+        }
+        lastRowCount = rowCount;
+        LOG.info("New row count {}", lastRowCount);
+      } catch (Exception e) {
+        checkAndReportError("Got error while row counting", e);
+      }
+      return true;
+    }
+
+    /**
+     * Checks the passed exception contains "Scanner not found". If it does then it returns true,
+     * else it reports the error and returns false.
+     * We need to do this because the scans in this client aren't fault tolerant.
+     * @param message message to print if the exception contains a real error
+     * @param e the exception to check
+     * @return true if the scanner failed because it wasn't false, otherwise false
+     */
+    private boolean checkAndReportError(String message, Exception e) {
+      if (!e.getCause().getMessage().contains("Scanner not found")) {
+        reportError(message, e);
+        return false;
+      }
+      return true;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c4180bb0/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduClient.java
index 7b4c932..ff0097b 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduClient.java
@@ -17,6 +17,7 @@
 package org.kududb.client;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
 import com.stumbleupon.async.Deferred;
 import org.junit.BeforeClass;
@@ -48,7 +49,7 @@ public class TestAsyncKuduClient extends BaseKuduTest {
     assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
 
     // 2. Disconnect the TabletClient.
-    client.getTableClients().get(0).shutdown().join(DEFAULT_SLEEP);
+    disconnectAndWait();
 
     // 3. Count again, it will trigger a re-connection and we should not hang or fail to scan.
     assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
@@ -76,11 +77,26 @@ public class TestAsyncKuduClient extends BaseKuduTest {
         rowCount, numRows);
 
     // 4. Disconnect the TS.
-    client.getTableClients().get(0).shutdown().join(DEFAULT_SLEEP);
+    disconnectAndWait();
+
     // 5. Make sure that we can continue scanning and that we get the remaining rows back.
     assertEquals(rowCount - numRows, countRowsInScan(scanner));
   }
 
+  private void disconnectAndWait() throws InterruptedException {
+    client.getTableClients().get(0).disconnect();
+    Stopwatch sw = new Stopwatch().start();
+    while (sw.elapsedMillis() < DEFAULT_SLEEP) {
+      if (!client.getTableClients().isEmpty()) {
+        Thread.sleep(50);
+        continue;
+      } else {
+        break;
+      }
+    }
+    assertTrue(client.getTableClients().isEmpty());
+  }
+
   @Test
   public void testBadHostnames() throws Exception {
     String badHostname = "some-unknown-host-hopefully";