You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/14 09:18:56 UTC
[flink] 02/02: [FLINK-12260][tests] Speed up
ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ca85285cda0f0cb6f82ed55a25aa4c439be1c2b2
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon May 13 15:45:03 2019 +0200
[FLINK-12260][tests] Speed up ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor
Use latches instead of timeouts/sleeps to test problematic thread interleaving.
This closes #8415.
---
.../ResourceManagerTaskExecutorTest.java | 39 ++++++++++++++++------
.../flink/runtime/rpc/TestingRpcService.java | 29 +++++++---------
2 files changed, 40 insertions(+), 28 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index d1532a5..0d80bcb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -58,6 +60,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -237,23 +240,36 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
*/
@Test
public void testDelayedRegisterTaskExecutor() throws Exception {
- // additional delay over RPC timeout
- // use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below
- final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+ final Time fastTimeout = Time.milliseconds(1L);
try {
- // first registration is with connection delay longer than timeout expecting timeout and then retry
- rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis);
+ final OneShotLatch startConnection = new OneShotLatch();
+ final OneShotLatch finishConnection = new OneShotLatch();
+
+ // first registration is with blocking connection
+ rpcService.setRpcGatewayFutureFunction(rpcGateway ->
+ CompletableFuture.supplyAsync(
+ () -> {
+ startConnection.trigger();
+ try {
+ finishConnection.await();
+ } catch (InterruptedException ignored) {}
+ return rpcGateway;
+ },
+ TestingUtils.defaultExecutor()));
+
CompletableFuture<RegistrationResponse> firstFuture =
- rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+ rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, fastTimeout);
try {
firstFuture.get();
fail("Should have failed because connection to taskmanager is delayed beyond timeout");
} catch (Exception e) {
- assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ assertThat(ExceptionUtils.stripExecutionException(e), instanceOf(AskTimeoutException.class));
}
+ startConnection.await();
+
// second registration after timeout is with no delay, expecting it to be succeeded
- rpcService.setConnectionDelayMillis(0);
+ rpcService.resetRpcGatewayFutureFunction();
CompletableFuture<RegistrationResponse> secondFuture =
rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
RegistrationResponse response = secondFuture.get();
@@ -264,8 +280,9 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
rmGateway.sendSlotReport(taskExecutorResourceID,
((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
- // wait enough for the first registration's connection delay to be over letting its remaining part go through
- Thread.sleep(additionalDelayMillis * 2);
+ // let the remaining part of the first registration proceed
+ finishConnection.trigger();
+ Thread.sleep(1L);
// verify that the latest registration is valid not being unregistered by the delayed one
final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
@@ -274,7 +291,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
} finally {
- rpcService.setConnectionDelayMillis(0L);
+ rpcService.resetRpcGatewayFutureFunction();
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 4f1ae66a..fab90c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,11 +23,11 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -53,11 +53,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class TestingRpcService extends AkkaRpcService {
+ private static final Function<RpcGateway, CompletableFuture<RpcGateway>> DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION = CompletableFuture::completedFuture;
+
/** Map of pre-registered connections. */
private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
- /** Artificial delay on connection. */
- private long connectionDelayMillis;
+ private volatile Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
/**
* Creates a new {@code TestingRpcService}.
@@ -102,19 +103,9 @@ public class TestingRpcService extends AkkaRpcService {
}
}
+ @SuppressWarnings("unchecked")
private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
- if (connectionDelayMillis <= 0) {
- return CompletableFuture.completedFuture(gateway);
- } else {
- return CompletableFuture.supplyAsync(
- () -> {
- try {
- Thread.sleep(connectionDelayMillis);
- } catch (InterruptedException ignored) {}
- return gateway;
- },
- TestingUtils.defaultExecutor());
- }
+ return (CompletableFuture<C>) rpcGatewayFutureFunction.apply(gateway);
}
@Override
@@ -158,7 +149,11 @@ public class TestingRpcService extends AkkaRpcService {
registeredConnections.clear();
}
- public void setConnectionDelayMillis(long connectionDelayMillis) {
- this.connectionDelayMillis = connectionDelayMillis;
+ public void resetRpcGatewayFutureFunction() {
+ rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
+ }
+
+ public void setRpcGatewayFutureFunction(Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction) {
+ this.rpcGatewayFutureFunction = rpcGatewayFutureFunction;
}
}