You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/12 22:25:34 UTC
[13/14] flink git commit: [FLINK-8420] [flip6] Recognize
TimeoutException in RetryingRegistration
[FLINK-8420] [flip6] Recognize TimeoutException in RetryingRegistration
A timeout exception will trigger an exponential backoff wrt the connection timeout.
This will guarantee that we don't overload the network with connection requests but
also to quickly connect to a newly available target.
This closes #5286.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c99ae89
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c99ae89
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c99ae89
Branch: refs/heads/master
Commit: 3c99ae8959f69325bb9b7d810b41c60e42e602c5
Parents: d0bc300
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 12 14:02:05 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 12 17:54:10 2018 +0100
----------------------------------------------------------------------
.../registration/RetryingRegistration.java | 3 +-
.../registration/RetryingRegistrationTest.java | 48 ++++++++++++++++----
2 files changed, 42 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3c99ae89/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index 802d361..279714b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -263,7 +264,7 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc
registrationAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !isCanceled()) {
- if (failure instanceof TimeoutException) {
+ if (ExceptionUtils.stripCompletionException(failure) instanceof TimeoutException) {
// we simply have not received a response in time. maybe the timeout was
// very low (initial fast registration attempts), maybe the target endpoint is
// currently down.
http://git-wip-us.apache.org/repos/asf/flink/blob/3c99ae89/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 7fc6897..885a7f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -160,7 +160,7 @@ public class RetryingRegistrationTest extends TestLogger {
}
}
- @Test
+ @Test(timeout = 10000)
public void testRetriesOnTimeouts() throws Exception {
final String testId = "rien ne va plus";
final String testEndpointAddress = "<test-address>";
@@ -178,7 +178,15 @@ public class RetryingRegistrationTest extends TestLogger {
try {
rpc.registerGateway(testEndpointAddress, testGateway);
- TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+ final long initialTimeout = 20L;
+ TestRetryingRegistration registration = new TestRetryingRegistration(
+ rpc,
+ testEndpointAddress,
+ leaderId,
+ initialTimeout,
+ 1000L,
+ 15000L, // make sure that we timeout in case of an error
+ 15000L);
long started = System.nanoTime();
registration.startRegistration();
@@ -195,7 +203,7 @@ public class RetryingRegistrationTest extends TestLogger {
assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
// validate that some retry-delay / back-off behavior happened
- assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
+ assertTrue("retries did not properly back off", elapsedMillis >= 3 * initialTimeout);
}
finally {
rpc.stopService();
@@ -346,11 +354,35 @@ public class RetryingRegistrationTest extends TestLogger {
static final long DELAY_ON_DECLINE = 200;
public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) {
- super(LoggerFactory.getLogger(RetryingRegistrationTest.class),
- rpc, "TestEndpoint",
- TestRegistrationGateway.class,
- targetAddress, leaderId,
- INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, DELAY_ON_DECLINE);
+ this(
+ rpc,
+ targetAddress,
+ leaderId,
+ INITIAL_TIMEOUT,
+ MAX_TIMEOUT,
+ DELAY_ON_ERROR,
+ DELAY_ON_DECLINE);
+ }
+
+ public TestRetryingRegistration(
+ RpcService rpc,
+ String targetAddress,
+ UUID leaderId,
+ long initialTimeout,
+ long maxTimeout,
+ long delayOnError,
+ long delayOnDecline) {
+ super(
+ LoggerFactory.getLogger(RetryingRegistrationTest.class),
+ rpc,
+ "TestEndpoint",
+ TestRegistrationGateway.class,
+ targetAddress,
+ leaderId,
+ initialTimeout,
+ maxTimeout,
+ delayOnError,
+ delayOnDecline);
}
@Override