You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/02/27 02:14:45 UTC

[kudu] 02/03: KUDU-2710: Fix KeepAliveRequest retries

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch branch-1.9.x
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f4f4f81eafd3caa303b89eaa3ac4ca2448df5867
Author: Grant Henke <gr...@apache.org>
AuthorDate: Mon Feb 25 21:17:01 2019 -0600

    KUDU-2710: Fix KeepAliveRequest retries
    
    Fixes KeepAliveRequest retries by adding a partitionKey
    implementation. Without this a null partitionKey is passed
    and the client treats this as a master table.
    
    A follow up patch should include fixes to prevent issues
    like this in the future and fix any remaining retry issues.
    This patch is kept small to ensure easy backports.
    
    Change-Id: I951212ab76079e5788c2870223b45782b16509e7
    Reviewed-on: http://gerrit.cloudera.org:8080/12586
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    (cherry picked from commit 6302811eb73efdfd2a3da84c25f5d6589302dee1)
    Reviewed-on: http://gerrit.cloudera.org:8080/12608
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduScanner.java    |  6 ++++++
 .../main/java/org/apache/kudu/client/RpcProxy.java  | 21 +++++++++++++++++++++
 .../java/org/apache/kudu/client/TestKuduClient.java | 10 +++++++++-
 3 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 36bb0af..fd085b4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -864,6 +864,12 @@ public final class AsyncKuduScanner {
     }
 
     @Override
+    public byte[] partitionKey() {
+      // This key is used to lookup where the request needs to go
+      return pruner.nextPartitionKey();
+    }
+
+    @Override
     Pair<Void, Object> deserialize(final CallResponse callResponse,
                                    String tsUUID) throws KuduException {
       ScannerKeepAliveResponsePB.Builder builder = ScannerKeepAliveResponsePB.newBuilder();
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 347e3c8..5a2728d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -60,6 +60,9 @@ class RpcProxy {
 
   private static final Logger LOG = LoggerFactory.getLogger(RpcProxy.class);
 
+  private static int staticNumFail = 0;
+  private static Exception staticException = null;
+
   /** The reference to the top-level Kudu client object. */
   @Nonnull
   private final AsyncKuduClient client;
@@ -90,6 +93,18 @@ class RpcProxy {
   }
 
   /**
+   * Fails the next numFail RPCs by throwing the passed exception.
+   * @param numFail the number of RPCs to fail
+   * @param exception the exception to throw when failing an rpc
+   */
+  @InterfaceAudience.LimitedPrivate("Test")
+  static void failNextRpcs(int numFail, Exception exception) {
+    Preconditions.checkNotNull(exception);
+    staticNumFail = numFail;
+    staticException = exception;
+  }
+
+  /**
    * Send the specified RPC using the connection to the Kudu server.
    *
    * @param <R> type of the RPC
@@ -101,6 +116,12 @@ class RpcProxy {
                           final Connection connection,
                           final KuduRpc<R> rpc) {
     try {
+      // Throw an exception to enable testing failures. See `failNextRpcs`.
+      if (staticNumFail > 0) {
+        staticNumFail--;
+        LOG.warn("Forcing a failure on sendRpc: " + rpc);
+        throw staticException;
+      }
       if (!rpc.getRequiredFeatures().isEmpty()) {
         // An extra optimization: when the peer's features are already known, check that the server
         // supports feature flags, if those are required.
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 303f53a..576ec88 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -47,6 +47,7 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -55,10 +56,11 @@ import java.util.concurrent.Future;
 import com.google.common.collect.ImmutableList;
 import com.stumbleupon.async.Deferred;
 
+import org.apache.kudu.test.ClientTestUtil;
 import org.apache.kudu.test.KuduTestHarness;
 import org.apache.kudu.test.KuduTestHarness.LocationConfig;
 import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
-import org.apache.kudu.test.ClientTestUtil;
+import org.apache.kudu.test.RandomUtils;
 import org.apache.kudu.util.TimestampUtil;
 import org.junit.Before;
 import org.junit.Rule;
@@ -286,8 +288,14 @@ public class TestKuduClient {
     // Wait for longer than the scanner ttl calling keepAlive throughout.
     // Each loop sleeps 25% of the scanner ttl and we loop 10 times to ensure
     // we extend over 2x the scanner ttl.
+    Random random = RandomUtils.getRandom();
     for (int i = 0; i < 10; i++) {
       Thread.sleep(SHORT_SCANNER_TTL_MS / 4);
+      // Force 1/3 of the keepAlive requests to retry up to 3 times.
+      if (i % 3 == 0) {
+        RpcProxy.failNextRpcs(random.nextInt(4),
+            new RecoverableException(Status.ServiceUnavailable("testKeepAlive")));
+      }
       scanner.keepAlive();
     }