You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/14 09:18:54 UTC

[flink] branch release-1.7 updated (e5211d8 -> ca85285)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e5211d8  [FLINK-9445][scala] Scala-shell uses JAVA_RUN
     new bcd35b9  [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race
     new ca85285  [FLINK-12260][tests] Speed up ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/resourcemanager/ResourceManager.java   |  29 ++++--
 .../ResourceManagerTaskExecutorTest.java           | 102 +++++++++++++++++----
 .../flink/runtime/rpc/TestingRpcService.java       |  28 +++++-
 3 files changed, 128 insertions(+), 31 deletions(-)


[flink] 01/02: [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bcd35b9b51e96b231bc64d3b45583bfcf47c3d18
Author: Hwanju Kim <hw...@amazon.com>
AuthorDate: Thu May 9 17:03:37 2019 -0700

    [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race
    
    TaskExecutor registration has asynchronous process, which allows a next
    retry after timeout to be processed first ahead of earlier request. Such
    delayed timed-out request can accidently unregister a valid task
    manager, whose slots are permanently not reported to job manager. This
    patch introduces ongoing task executor futures to prevent such race.
---
 .../runtime/resourcemanager/ResourceManager.java   | 29 +++++---
 .../ResourceManagerTaskExecutorTest.java           | 85 ++++++++++++++++++----
 .../flink/runtime/rpc/TestingRpcService.java       | 33 +++++++--
 3 files changed, 116 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f90f63a..eca7f91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -117,6 +117,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	/** All currently registered TaskExecutors with there framework specific worker information. */
 	private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
 
+	/** Ongoing registration of TaskExecutors per resource ID. */
+	private final Map<ResourceID, CompletableFuture<TaskExecutorGateway>> taskExecutorGatewayFutures;
+
 	/** High availability services for leader retrieval and election. */
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -193,10 +196,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		this.jmResourceIdRegistrations = new HashMap<>(4);
 		this.taskExecutors = new HashMap<>(8);
 		infoMessageListeners = new ConcurrentHashMap<>(8);
+		this.taskExecutorGatewayFutures = new HashMap<>(8);
 	}
 
-
-
 	// ------------------------------------------------------------------------
 	//  RPC lifecycle methods
 	// ------------------------------------------------------------------------
@@ -356,18 +358,25 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			final Time timeout) {
 
 		CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+		taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture);
 
 		return taskExecutorGatewayFuture.handleAsync(
 			(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
-				if (throwable != null) {
-					return new RegistrationResponse.Decline(throwable.getMessage());
+				if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
+					taskExecutorGatewayFutures.remove(taskExecutorResourceId);
+					if (throwable != null) {
+						return new RegistrationResponse.Decline(throwable.getMessage());
+					} else {
+						return registerTaskExecutorInternal(
+							taskExecutorGateway,
+							taskExecutorAddress,
+							taskExecutorResourceId,
+							dataPort,
+							hardwareDescription);
+					}
 				} else {
-					return registerTaskExecutorInternal(
-						taskExecutorGateway,
-						taskExecutorAddress,
-						taskExecutorResourceId,
-						dataPort,
-						hardwareDescription);
+					log.info("Ignoring outdated TaskExecutorGateway connection.");
+					return new RegistrationResponse.Decline("Decline outdated task executor registration.");
 				}
 			},
 			getMainThreadExecutor());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index c3878ff..d1532a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -37,12 +39,15 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
+import akka.pattern.AskTimeoutException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -62,9 +67,11 @@ import static org.mockito.Mockito.when;
 
 public class ResourceManagerTaskExecutorTest extends TestLogger {
 
-	private final Time timeout = Time.seconds(10L);
+	private static final Time TIMEOUT = Time.seconds(10L);
 
-	private TestingRpcService rpcService;
+	private static final long HEARTBEAT_TIMEOUT = 5000;
+
+	private static TestingRpcService rpcService;
 
 	private int dataPort = 1234;
 
@@ -95,14 +102,14 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		resourceManager = createAndStartResourceManager(rmLeaderElectionService, testingFatalErrorHandler);
 		rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 		wronglyFencedGateway = rpcService.connect(resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class)
-			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-		grantLeadership(rmLeaderElectionService).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		grantLeadership(rmLeaderElectionService).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 	}
 
 	@After
 	public void teardown() throws Exception {
-		RpcUtils.terminateRpcService(rpcService, timeout);
+		RpcUtils.terminateRpcService(rpcService, TIMEOUT);
 	}
 
 	/**
@@ -114,17 +121,17 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test response successful
 			CompletableFuture<RegistrationResponse> successfulFuture =
-				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
-			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			RegistrationResponse response = successfulFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
 				taskExecutorResourceID,
-				timeout).get();
+				TIMEOUT).get();
 			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
 			CompletableFuture<RegistrationResponse> duplicateFuture =
-				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
 			RegistrationResponse duplicateResponse = duplicateFuture.get();
 			assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
 			assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
@@ -136,17 +143,17 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	}
 
 	/**
-	 * Test receive registration with unmatched leadershipId from task executor
+	 * Test receive registration with unmatched leadershipId from task executor.
 	 */
 	@Test
 	public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
 		try {
 			// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
 			CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
-				wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
+				wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
 
 			try {
-				unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+				unMatchedLeaderFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 				fail("Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
 			} catch (ExecutionException e) {
 				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
@@ -159,7 +166,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	}
 
 	/**
-	 * Test receive registration with invalid address from task executor
+	 * Test receive registration with invalid address from task executor.
 	 */
 	@Test
 	public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
@@ -167,8 +174,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			// test throw exception when receive a registration from taskExecutor which takes invalid address
 			String invalidAddress = "/taskExecutor2";
 			CompletableFuture<RegistrationResponse> invalidAddressFuture =
-				rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
-			assertTrue(invalidAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
+				rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			assertTrue(invalidAddressFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {
 				testingFatalErrorHandler.rethrowError();
@@ -187,7 +194,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private StandaloneResourceManager createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
+		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, HEARTBEAT_TIMEOUT);
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 
 		SlotManager slotManager = SlotManagerBuilder.newBuilder()
@@ -224,4 +231,50 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		return leaderElectionService.isLeader(leaderSessionId);
 	}
 
+	/**
+	 * Test delayed registration of task executor where the delay is introduced during connection from resource manager
+	 * to the registering task executor.
+	 */
+	@Test
+	public void testDelayedRegisterTaskExecutor() throws Exception {
+		// additional delay over RPC timeout
+		// use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below
+		final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+		try {
+			// first registration is with connection delay longer than timeout expecting timeout and then retry
+			rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis);
+			CompletableFuture<RegistrationResponse> firstFuture =
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			try {
+				firstFuture.get();
+				fail("Should have failed because connection to taskmanager is delayed beyond timeout");
+			} catch (Exception e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			// second registration after timeout is with no delay, expecting it to be succeeded
+			rpcService.setConnectionDelayMillis(0);
+			CompletableFuture<RegistrationResponse> secondFuture =
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			RegistrationResponse response = secondFuture.get();
+			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+			// on success, send slot report for taskmanager registration
+			final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceID, 0), ResourceProfile.UNKNOWN));
+			rmGateway.sendSlotReport(taskExecutorResourceID,
+				((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
+
+			// wait enough for the first registration's connection delay to be over letting its remaining part go through
+			Thread.sleep(additionalDelayMillis * 2);
+
+			// verify that the latest registration is valid not being unregistered by the delayed one
+			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
+				taskExecutorResourceID,
+				TIMEOUT).get();
+			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
+			assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
+		} finally {
+			rpcService.setConnectionDelayMillis(0L);
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index db70a0f..4f1ae66a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
@@ -52,18 +53,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TestingRpcService extends AkkaRpcService {
 
-	/** Map of pre-registered connections */
+	/** Map of pre-registered connections. */
 	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
 
+	/** Artificial delay on connection. */
+	private long connectionDelayMillis;
+
 	/**
-	 * Creates a new {@code TestingRpcService}. 
+	 * Creates a new {@code TestingRpcService}.
 	 */
 	public TestingRpcService() {
 		this(new Configuration());
 	}
 
 	/**
-	 * Creates a new {@code TestingRpcService}, using the given configuration. 
+	 * Creates a new {@code TestingRpcService}, using the given configuration.
 	 */
 	public TestingRpcService(Configuration configuration) {
 		super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
@@ -98,6 +102,21 @@ public class TestingRpcService extends AkkaRpcService {
 		}
 	}
 
+	private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
+		if (connectionDelayMillis <= 0) {
+			return CompletableFuture.completedFuture(gateway);
+		} else {
+			return CompletableFuture.supplyAsync(
+				() -> {
+					try {
+						Thread.sleep(connectionDelayMillis);
+					} catch (InterruptedException ignored) {}
+					return gateway;
+				},
+				TestingUtils.defaultExecutor());
+		}
+	}
+
 	@Override
 	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
 		RpcGateway gateway = registeredConnections.get(address);
@@ -106,7 +125,7 @@ public class TestingRpcService extends AkkaRpcService {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return CompletableFuture.completedFuture(typedGateway);
+				return getRpcGatewayFuture(typedGateway);
 			} else {
 				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
@@ -126,7 +145,7 @@ public class TestingRpcService extends AkkaRpcService {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return CompletableFuture.completedFuture(typedGateway);
+				return getRpcGatewayFuture(typedGateway);
 			} else {
 				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
@@ -138,4 +157,8 @@ public class TestingRpcService extends AkkaRpcService {
 	public void clearGateways() {
 		registeredConnections.clear();
 	}
+
+	public void setConnectionDelayMillis(long connectionDelayMillis) {
+		this.connectionDelayMillis = connectionDelayMillis;
+	}
 }


[flink] 02/02: [FLINK-12260][tests] Speed up ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca85285cda0f0cb6f82ed55a25aa4c439be1c2b2
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon May 13 15:45:03 2019 +0200

    [FLINK-12260][tests] Speed up ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor
    
    Use latches instead of timeouts/sleeps to test problematic thread interleaving.
    
    This closes #8415.
---
 .../ResourceManagerTaskExecutorTest.java           | 39 ++++++++++++++++------
 .../flink/runtime/rpc/TestingRpcService.java       | 29 +++++++---------
 2 files changed, 40 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index d1532a5..0d80bcb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -58,6 +60,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -237,23 +240,36 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	 */
 	@Test
 	public void testDelayedRegisterTaskExecutor() throws Exception {
-		// additional delay over RPC timeout
-		// use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below
-		final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+		final Time fastTimeout = Time.milliseconds(1L);
 		try {
-			// first registration is with connection delay longer than timeout expecting timeout and then retry
-			rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis);
+			final OneShotLatch startConnection = new OneShotLatch();
+			final OneShotLatch finishConnection = new OneShotLatch();
+
+			// first registration is with blocking connection
+			rpcService.setRpcGatewayFutureFunction(rpcGateway ->
+				CompletableFuture.supplyAsync(
+					() -> {
+						startConnection.trigger();
+						try {
+							finishConnection.await();
+						} catch (InterruptedException ignored) {}
+						return rpcGateway;
+					},
+					TestingUtils.defaultExecutor()));
+
 			CompletableFuture<RegistrationResponse> firstFuture =
-				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, fastTimeout);
 			try {
 				firstFuture.get();
 				fail("Should have failed because connection to taskmanager is delayed beyond timeout");
 			} catch (Exception e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+				assertThat(ExceptionUtils.stripExecutionException(e), instanceOf(AskTimeoutException.class));
 			}
 
+			startConnection.await();
+
 			// second registration after timeout is with no delay, expecting it to be succeeded
-			rpcService.setConnectionDelayMillis(0);
+			rpcService.resetRpcGatewayFutureFunction();
 			CompletableFuture<RegistrationResponse> secondFuture =
 				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
 			RegistrationResponse response = secondFuture.get();
@@ -264,8 +280,9 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			rmGateway.sendSlotReport(taskExecutorResourceID,
 				((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
 
-			// wait enough for the first registration's connection delay to be over letting its remaining part go through
-			Thread.sleep(additionalDelayMillis * 2);
+			// let the remaining part of the first registration proceed
+			finishConnection.trigger();
+			Thread.sleep(1L);
 
 			// verify that the latest registration is valid not being unregistered by the delayed one
 			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
@@ -274,7 +291,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
 			assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
 		} finally {
-			rpcService.setConnectionDelayMillis(0L);
+			rpcService.resetRpcGatewayFutureFunction();
 		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 4f1ae66a..fab90c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,11 +23,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -53,11 +53,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TestingRpcService extends AkkaRpcService {
 
+	private static final Function<RpcGateway, CompletableFuture<RpcGateway>> DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION = CompletableFuture::completedFuture;
+
 	/** Map of pre-registered connections. */
 	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
 
-	/** Artificial delay on connection. */
-	private long connectionDelayMillis;
+	private volatile Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
 
 	/**
 	 * Creates a new {@code TestingRpcService}.
@@ -102,19 +103,9 @@ public class TestingRpcService extends AkkaRpcService {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
-		if (connectionDelayMillis <= 0) {
-			return CompletableFuture.completedFuture(gateway);
-		} else {
-			return CompletableFuture.supplyAsync(
-				() -> {
-					try {
-						Thread.sleep(connectionDelayMillis);
-					} catch (InterruptedException ignored) {}
-					return gateway;
-				},
-				TestingUtils.defaultExecutor());
-		}
+		return (CompletableFuture<C>) rpcGatewayFutureFunction.apply(gateway);
 	}
 
 	@Override
@@ -158,7 +149,11 @@ public class TestingRpcService extends AkkaRpcService {
 		registeredConnections.clear();
 	}
 
-	public void setConnectionDelayMillis(long connectionDelayMillis) {
-		this.connectionDelayMillis = connectionDelayMillis;
+	public void resetRpcGatewayFutureFunction() {
+		rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
+	}
+
+	public void setRpcGatewayFutureFunction(Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction) {
+		this.rpcGatewayFutureFunction = rpcGatewayFutureFunction;
 	}
 }