You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/12 22:25:34 UTC

[13/14] flink git commit: [FLINK-8420] [flip6] Recognize TimeoutException in RetryingRegistration

[FLINK-8420] [flip6] Recognize TimeoutException in RetryingRegistration

A timeout exception will trigger an exponential backoff wrt the connection timeout.
This will guarantee that we don't overload the network with connection requests but
also to quickly connect to a newly available target.

This closes #5286.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c99ae89
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c99ae89
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c99ae89

Branch: refs/heads/master
Commit: 3c99ae8959f69325bb9b7d810b41c60e42e602c5
Parents: d0bc300
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 12 14:02:05 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 12 17:54:10 2018 +0100

----------------------------------------------------------------------
 .../registration/RetryingRegistration.java      |  3 +-
 .../registration/RetryingRegistrationTest.java  | 48 ++++++++++++++++----
 2 files changed, 42 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c99ae89/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index 802d361..279714b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 
@@ -263,7 +264,7 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc
 			registrationAcceptFuture.whenCompleteAsync(
 				(Void v, Throwable failure) -> {
 					if (failure != null && !isCanceled()) {
-						if (failure instanceof TimeoutException) {
+						if (ExceptionUtils.stripCompletionException(failure) instanceof TimeoutException) {
 							// we simply have not received a response in time. maybe the timeout was
 							// very low (initial fast registration attempts), maybe the target endpoint is
 							// currently down.

http://git-wip-us.apache.org/repos/asf/flink/blob/3c99ae89/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 7fc6897..885a7f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -160,7 +160,7 @@ public class RetryingRegistrationTest extends TestLogger {
 		}
 	}
 
-	@Test
+	@Test(timeout = 10000)
 	public void testRetriesOnTimeouts() throws Exception {
 		final String testId = "rien ne va plus";
 		final String testEndpointAddress = "<test-address>";
@@ -178,7 +178,15 @@ public class RetryingRegistrationTest extends TestLogger {
 		try {
 			rpc.registerGateway(testEndpointAddress, testGateway);
 
-			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+			final long initialTimeout = 20L;
+			TestRetryingRegistration registration = new TestRetryingRegistration(
+				rpc,
+				testEndpointAddress,
+				leaderId,
+				initialTimeout,
+				1000L,
+				15000L, // make sure that we timeout in case of an error
+				15000L);
 
 			long started = System.nanoTime();
 			registration.startRegistration();
@@ -195,7 +203,7 @@ public class RetryingRegistrationTest extends TestLogger {
 			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
 
 			// validate that some retry-delay / back-off behavior happened
-			assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
+			assertTrue("retries did not properly back off", elapsedMillis >= 3 * initialTimeout);
 		}
 		finally {
 			rpc.stopService();
@@ -346,11 +354,35 @@ public class RetryingRegistrationTest extends TestLogger {
 		static final long DELAY_ON_DECLINE = 200;
 
 		public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) {
-			super(LoggerFactory.getLogger(RetryingRegistrationTest.class),
-					rpc, "TestEndpoint",
-					TestRegistrationGateway.class,
-					targetAddress, leaderId,
-					INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, DELAY_ON_DECLINE);
+			this(
+				rpc,
+				targetAddress,
+				leaderId,
+				INITIAL_TIMEOUT,
+				MAX_TIMEOUT,
+				DELAY_ON_ERROR,
+				DELAY_ON_DECLINE);
+		}
+
+		public TestRetryingRegistration(
+				RpcService rpc,
+				String targetAddress,
+				UUID leaderId,
+				long initialTimeout,
+				long maxTimeout,
+				long delayOnError,
+				long delayOnDecline) {
+			super(
+				LoggerFactory.getLogger(RetryingRegistrationTest.class),
+				rpc,
+				"TestEndpoint",
+				TestRegistrationGateway.class,
+				targetAddress,
+				leaderId,
+				initialTimeout,
+				maxTimeout,
+				delayOnError,
+				delayOnDecline);
 		}
 
 		@Override