You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/14 09:15:34 UTC

[flink] 05/05: [FLINK-12260][tests] Speed up ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2284f777ecd3b62b412bd0fdb9dbcf492314c589
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon May 13 15:45:03 2019 +0200

    [FLINK-12260][tests] Speed up ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor
    
    Use latches instead of timeouts/sleeps to test problematic thread interleaving.
    
    This closes #8415.
---
 .../ResourceManagerTaskExecutorTest.java           | 38 +++++++++++++++-------
 .../flink/runtime/rpc/TestingRpcService.java       | 29 +++++++----------
 2 files changed, 39 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index fcb92af..63d8245 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -45,6 +46,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -217,23 +219,36 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	 */
 	@Test
 	public void testDelayedRegisterTaskExecutor() throws Exception {
-		// additional delay over RPC timeout
-		// use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below
-		final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+		final Time fastTimeout = Time.milliseconds(1L);
 		try {
-			// first registration is with connection delay longer than timeout expecting timeout and then retry
-			rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis);
+			final OneShotLatch startConnection = new OneShotLatch();
+			final OneShotLatch finishConnection = new OneShotLatch();
+
+			// first registration is with blocking connection
+			rpcService.setRpcGatewayFutureFunction(rpcGateway ->
+				CompletableFuture.supplyAsync(
+					() -> {
+						startConnection.trigger();
+						try {
+							finishConnection.await();
+						} catch (InterruptedException ignored) {}
+						return rpcGateway;
+					},
+					TestingUtils.defaultExecutor()));
+
 			CompletableFuture<RegistrationResponse> firstFuture =
-				rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+				rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, fastTimeout);
 			try {
 				firstFuture.get();
 				fail("Should have failed because connection to taskmanager is delayed beyond timeout");
 			} catch (Exception e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+				assertThat(ExceptionUtils.stripExecutionException(e), instanceOf(AskTimeoutException.class));
 			}
 
+			startConnection.await();
+
 			// second registration after timeout is with no delay, expecting it to be succeeded
-			rpcService.setConnectionDelayMillis(0);
+			rpcService.resetRpcGatewayFutureFunction();
 			CompletableFuture<RegistrationResponse> secondFuture =
 				rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
 			RegistrationResponse response = secondFuture.get();
@@ -244,8 +259,9 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			rmGateway.sendSlotReport(taskExecutorResourceID,
 				((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
 
-			// wait enough for the first registration's connection delay to be over letting its remaining part go through
-			Thread.sleep(additionalDelayMillis * 2);
+			// let the remaining part of the first registration proceed
+			finishConnection.trigger();
+			Thread.sleep(1L);
 
 			// verify that the latest registration is valid not being unregistered by the delayed one
 			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
@@ -254,7 +270,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
 			assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
 		} finally {
-			rpcService.setConnectionDelayMillis(0L);
+			rpcService.resetRpcGatewayFutureFunction();
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 6d12266..f11269d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,11 +23,11 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -53,11 +53,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TestingRpcService extends AkkaRpcService {
 
+	private static final Function<RpcGateway, CompletableFuture<RpcGateway>> DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION = CompletableFuture::completedFuture;
+
 	/** Map of pre-registered connections. */
 	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
 
-	/** Artificial delay on connection */
-	private long connectionDelayMillis;
+	private volatile Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
 
 	/**
 	 * Creates a new {@code TestingRpcService}.
@@ -103,19 +104,9 @@ public class TestingRpcService extends AkkaRpcService {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
-		if (connectionDelayMillis <= 0) {
-			return CompletableFuture.completedFuture(gateway);
-		} else {
-			return CompletableFuture.supplyAsync(
-				() -> {
-					try {
-						Thread.sleep(connectionDelayMillis);
-					} catch (InterruptedException ignored) {}
-					return gateway;
-				},
-				TestingUtils.defaultExecutor());
-		}
+		return (CompletableFuture<C>) rpcGatewayFutureFunction.apply(gateway);
 	}
 
 	@Override
@@ -159,7 +150,11 @@ public class TestingRpcService extends AkkaRpcService {
 		registeredConnections.clear();
 	}
 
-	public void setConnectionDelayMillis(long connectionDelayMillis) {
-		this.connectionDelayMillis = connectionDelayMillis;
+	public void resetRpcGatewayFutureFunction() {
+		rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
+	}
+
+	public void setRpcGatewayFutureFunction(Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction) {
+		this.rpcGatewayFutureFunction = rpcGatewayFutureFunction;
 	}
 }