You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/10/12 18:06:02 UTC

kudu git commit: [java client] AsyncKuduClient#delayedSendRpcToTablet should return a Deferred

Repository: kudu
Updated Branches:
  refs/heads/master 71f21f0ab -> 7bcfcf64c


[java client] AsyncKuduClient#delayedSendRpcToTablet should return a Deferred

There's some confusion in the Java client code regarding where Deferred objects
are returned and also sent via an RPC's callback. Such a place is delayedSendRpcToTablet,
since it's called from both the context of paths that come from the user
application (need a Deferred) and paths that simply handle background retries (don't
need a Deferred since we rely on KuduRpc#callback).

This problem manifested itself in a unit test:
02:15:40.300 [DEBUG - main] (AsyncKuduClient.java:1065) Cannot continue with this RPC: Batch{operations=1, tablet="4114911c600b4bc3bbca228f1880568c" [<start>, <end>), ignoreAllDuplicateRows=false} because of: RPC can not complete before timeout:
org.apache.kudu.client.NonRecoverableException: RPC can not complete before timeout: Batch{operations=1, tablet="4114911c600b4bc3bbca228f1880568c" [<start>, <end>), ignoreAllDuplicateRows=false}
	at org.apache.kudu.client.AsyncKuduClient.tooManyAttemptsOrTimeout(AsyncKuduClient.java:1063)
	at org.apache.kudu.client.AsyncKuduClient.delayedSendRpcToTablet(AsyncKuduClient.java:1275)
	at org.apache.kudu.client.AsyncKuduClient.sendRpcToTablet(AsyncKuduClient.java:756)
	at org.apache.kudu.client.AsyncKuduSession$TabletLookupCB.call(AsyncKuduSession.java:371)
	at org.apache.kudu.client.AsyncKuduSession$TabletLookupCB.call(AsyncKuduSession.java:302)
	at com.stumbleupon.async.Deferred.doCall(Deferred.java:1280)
	at com.stumbleupon.async.Deferred.addCallbacks(Deferred.java:685)
	at com.stumbleupon.async.Deferred.addBoth(Deferred.java:769)
	at org.apache.kudu.util.AsyncUtil.addBoth(AsyncUtil.java:59)
	at org.apache.kudu.client.AsyncKuduSession.doFlush(AsyncKuduSession.java:436)
	at org.apache.kudu.client.AsyncKuduSession.flush(AsyncKuduSession.java:405)
	at org.apache.kudu.client.TestAsyncKuduSession.testInsertIntoUnavailableTablet(TestAsyncKuduSession.java:164)

Here the Deferred.fromError that got created in tooManyAttemptsOrTimeout was ignored by delayedSendRpcToTablet, then
in sendRpcToTablet we sent back the RPC's Deferred which at that point was voided since tooManyAttemptsOrTimeout
_also_ called that RPC's callback.

I tried writing a test, but even with looping on my machine I can't repro the issue.

Change-Id: I7c5e87b7dc3366ed9d72121a2421c5cd2304608b
Reviewed-on: http://gerrit.cloudera.org:8080/4654
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7bcfcf64
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7bcfcf64
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7bcfcf64

Branch: refs/heads/master
Commit: 7bcfcf64ccc2300f482e11e73d9703b92093394b
Parents: 71f21f0
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Thu Oct 6 15:57:28 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Oct 12 18:05:40 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7bcfcf64/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index cedd111..84e60de 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -654,8 +654,7 @@ public class AsyncKuduClient implements AutoCloseable {
       // A null client means we either don't know about this tablet anymore (unlikely) or we
       // couldn't find a leader (which could be triggered by a read timeout).
       // We'll first delay the RPC in case things take some time to settle down, then retry.
-      delayedSendRpcToTablet(next_request, null);
-      return next_request.getDeferred();
+      return delayedSendRpcToTablet(next_request, null);
     }
     client.sendRpc(next_request);
     return d;
@@ -753,8 +752,7 @@ public class AsyncKuduClient implements AutoCloseable {
 
         if (newTabletClient == null) {
           // Wait a little bit before hitting the master.
-          delayedSendRpcToTablet(request, null);
-          return request.getDeferred();
+          return delayedSendRpcToTablet(request, null);
         }
 
         if (!newTabletClient.isAlive()) {
@@ -837,9 +835,7 @@ public class AsyncKuduClient implements AutoCloseable {
     @Override
     public Deferred<R> call(Exception arg) {
       if (arg instanceof RecoverableException) {
-        Deferred<R> d = request.getDeferred();
-        delayedSendRpcToTablet(request, (KuduException) arg);
-        return d;
+        return delayedSendRpcToTablet(request, (KuduException) arg);
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug(String.format("Notify RPC %s after lookup exception", request), arg);
@@ -1257,10 +1253,22 @@ public class AsyncKuduClient implements AutoCloseable {
 
   <R> void handleRetryableError(final KuduRpc<R> rpc, KuduException ex) {
     // TODO we don't always need to sleep, maybe another replica can serve this RPC.
+    // We don't care about the returned Deferred in this case, since we're not in a context where
+    // we're eventually returning a Deferred.
     delayedSendRpcToTablet(rpc, ex);
   }
 
-  private <R> void delayedSendRpcToTablet(final KuduRpc<R> rpc, KuduException ex) {
+  /**
+   * This methods enable putting RPCs on hold for a period of time determined by
+   * {@link #getSleepTimeForRpc(KuduRpc)}. If the RPC is out of time/retries, its errback will
+   * be immediately called.
+   * @param rpc the RPC to retry later
+   * @param ex the reason why we need to retry, might be null
+   * @return a Deferred object to use if this method is called inline with the user's original
+   * attempt to send the RPC. Can be ignored in any other context that doesn't need to return a
+   * Deferred back to the user.
+   */
+  private <R> Deferred<R> delayedSendRpcToTablet(final KuduRpc<R> rpc, KuduException ex) {
     // Here we simply retry the RPC later. We might be doing this along with a lot of other RPCs
     // in parallel. Asynchbase does some hacking with a "probe" RPC while putting the other ones
     // on hold but we won't be doing this for the moment. Regions in HBase can move a lot,
@@ -1272,11 +1280,11 @@ public class AsyncKuduClient implements AutoCloseable {
     }
     long sleepTime = getSleepTimeForRpc(rpc);
     if (cannotRetryRequest(rpc) || rpc.deadlineTracker.wouldSleepingTimeout(sleepTime)) {
-      tooManyAttemptsOrTimeout(rpc, ex);
       // Don't let it retry.
-      return;
+      return tooManyAttemptsOrTimeout(rpc, ex);
     }
     newTimeout(new RetryTimer(), sleepTime);
+    return rpc.getDeferred();
   }
 
   /**