You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/02/27 02:14:45 UTC
[kudu] 02/03: KUDU-2710: Fix KeepAliveRequest retries
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch branch-1.9.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f4f4f81eafd3caa303b89eaa3ac4ca2448df5867
Author: Grant Henke <gr...@apache.org>
AuthorDate: Mon Feb 25 21:17:01 2019 -0600
KUDU-2710: Fix KeepAliveRequest retries
Fixes KeepAliveRequest retries by adding a partitionKey
implementation. Without this a null partitionKey is passed
and the client treats this as a master table.
A follow up patch should include fixes to prevent issues
like this in the future and fix any remaining retry issues.
This patch is kept small to ensure easy backports.
Change-Id: I951212ab76079e5788c2870223b45782b16509e7
Reviewed-on: http://gerrit.cloudera.org:8080/12586
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Adar Dembo <ad...@cloudera.com>
(cherry picked from commit 6302811eb73efdfd2a3da84c25f5d6589302dee1)
Reviewed-on: http://gerrit.cloudera.org:8080/12608
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Andrew Wong <aw...@cloudera.com>
---
.../org/apache/kudu/client/AsyncKuduScanner.java | 6 ++++++
.../main/java/org/apache/kudu/client/RpcProxy.java | 21 +++++++++++++++++++++
.../java/org/apache/kudu/client/TestKuduClient.java | 10 +++++++++-
3 files changed, 36 insertions(+), 1 deletion(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 36bb0af..fd085b4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -864,6 +864,12 @@ public final class AsyncKuduScanner {
}
@Override
+ public byte[] partitionKey() {
+ // This key is used to lookup where the request needs to go
+ return pruner.nextPartitionKey();
+ }
+
+ @Override
Pair<Void, Object> deserialize(final CallResponse callResponse,
String tsUUID) throws KuduException {
ScannerKeepAliveResponsePB.Builder builder = ScannerKeepAliveResponsePB.newBuilder();
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 347e3c8..5a2728d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -60,6 +60,9 @@ class RpcProxy {
private static final Logger LOG = LoggerFactory.getLogger(RpcProxy.class);
+ private static int staticNumFail = 0;
+ private static Exception staticException = null;
+
/** The reference to the top-level Kudu client object. */
@Nonnull
private final AsyncKuduClient client;
@@ -90,6 +93,18 @@ class RpcProxy {
}
/**
+ * Fails the next numFail RPCs by throwing the passed exception.
+ * @param numFail the number of RPCs to fail
+ * @param exception the exception to throw when failing an rpc
+ */
+ @InterfaceAudience.LimitedPrivate("Test")
+ static void failNextRpcs(int numFail, Exception exception) {
+ Preconditions.checkNotNull(exception);
+ staticNumFail = numFail;
+ staticException = exception;
+ }
+
+ /**
* Send the specified RPC using the connection to the Kudu server.
*
* @param <R> type of the RPC
@@ -101,6 +116,12 @@ class RpcProxy {
final Connection connection,
final KuduRpc<R> rpc) {
try {
+ // Throw an exception to enable testing failures. See `failNextRpcs`.
+ if (staticNumFail > 0) {
+ staticNumFail--;
+ LOG.warn("Forcing a failure on sendRpc: " + rpc);
+ throw staticException;
+ }
if (!rpc.getRequiredFeatures().isEmpty()) {
// An extra optimization: when the peer's features are already known, check that the server
// supports feature flags, if those are required.
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 303f53a..576ec88 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -47,6 +47,7 @@ import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -55,10 +56,11 @@ import java.util.concurrent.Future;
import com.google.common.collect.ImmutableList;
import com.stumbleupon.async.Deferred;
+import org.apache.kudu.test.ClientTestUtil;
import org.apache.kudu.test.KuduTestHarness;
import org.apache.kudu.test.KuduTestHarness.LocationConfig;
import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
-import org.apache.kudu.test.ClientTestUtil;
+import org.apache.kudu.test.RandomUtils;
import org.apache.kudu.util.TimestampUtil;
import org.junit.Before;
import org.junit.Rule;
@@ -286,8 +288,14 @@ public class TestKuduClient {
// Wait for longer than the scanner ttl calling keepAlive throughout.
// Each loop sleeps 25% of the scanner ttl and we loop 10 times to ensure
// we extend over 2x the scanner ttl.
+ Random random = RandomUtils.getRandom();
for (int i = 0; i < 10; i++) {
Thread.sleep(SHORT_SCANNER_TTL_MS / 4);
+ // Force 1/3 of the keepAlive requests to retry up to 3 times.
+ if (i % 3 == 0) {
+ RpcProxy.failNextRpcs(random.nextInt(4),
+ new RecoverableException(Status.ServiceUnavailable("testKeepAlive")));
+ }
scanner.keepAlive();
}