You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/14 09:15:33 UTC
[flink] 04/05: [FLINK-12260] Slot allocation failure by taskmanager
registration timeout and race
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 07773d0d9251d6ad8c1770de985d33be8e72b032
Author: Hwanju Kim <hw...@amazon.com>
AuthorDate: Thu May 9 17:03:37 2019 -0700
[FLINK-12260] Slot allocation failure by taskmanager registration timeout and race
TaskExecutor registration has asynchronous process, which allows a next
retry after timeout to be processed first ahead of earlier request. Such
delayed timed-out request can accidently unregister a valid task
manager, whose slots are permanently not reported to job manager. This
patch introduces ongoing task executor futures to prevent such race.
---
.../runtime/resourcemanager/ResourceManager.java | 27 +++++++----
.../ResourceManagerTaskExecutorTest.java | 52 +++++++++++++++++++++-
.../flink/runtime/rpc/TestingRpcService.java | 27 ++++++++++-
3 files changed, 95 insertions(+), 11 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 12860a8..03e1d87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -114,6 +114,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
/** All currently registered TaskExecutors with there framework specific worker information. */
private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
+ /** Ongoing registration of TaskExecutors per resource ID. */
+ private final Map<ResourceID, CompletableFuture<TaskExecutorGateway>> taskExecutorGatewayFutures;
+
/** High availability services for leader retrieval and election. */
private final HighAvailabilityServices highAvailabilityServices;
@@ -186,6 +189,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
this.jobManagerRegistrations = new HashMap<>(4);
this.jmResourceIdRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
+ this.taskExecutorGatewayFutures = new HashMap<>(8);
}
@@ -371,18 +375,25 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
final Time timeout) {
CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+ taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture);
return taskExecutorGatewayFuture.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
- if (throwable != null) {
- return new RegistrationResponse.Decline(throwable.getMessage());
+ if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
+ taskExecutorGatewayFutures.remove(taskExecutorResourceId);
+ if (throwable != null) {
+ return new RegistrationResponse.Decline(throwable.getMessage());
+ } else {
+ return registerTaskExecutorInternal(
+ taskExecutorGateway,
+ taskExecutorAddress,
+ taskExecutorResourceId,
+ dataPort,
+ hardwareDescription);
+ }
} else {
- return registerTaskExecutorInternal(
- taskExecutorGateway,
- taskExecutorAddress,
- taskExecutorResourceId,
- dataPort,
- hardwareDescription);
+ log.info("Ignoring outdated TaskExecutorGateway connection.");
+ return new RegistrationResponse.Decline("Decline outdated task executor registration.");
}
},
getMainThreadExecutor());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 95b3d08..fcb92af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
+import akka.pattern.AskTimeoutException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -79,6 +80,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
private static final Time TIMEOUT = Time.seconds(10L);
+ private static final long HEARTBEAT_TIMEOUT = 5000;
+
private static TestingRpcService rpcService;
private TestingTaskExecutorGateway taskExecutorGateway;
@@ -133,7 +136,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
private StandaloneResourceManager createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
- HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
+ HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, HEARTBEAT_TIMEOUT);
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
SlotManager slotManager = SlotManagerBuilder.newBuilder()
@@ -209,6 +212,53 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
}
/**
+ * Test delayed registration of task executor where the delay is introduced during connection from resource manager
+ * to the registering task executor.
+ */
+ @Test
+ public void testDelayedRegisterTaskExecutor() throws Exception {
+ // additional delay over RPC timeout
+ // use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below
+ final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+ try {
+ // first registration is with connection delay longer than timeout expecting timeout and then retry
+ rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis);
+ CompletableFuture<RegistrationResponse> firstFuture =
+ rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+ try {
+ firstFuture.get();
+ fail("Should have failed because connection to taskmanager is delayed beyond timeout");
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ }
+
+ // second registration after timeout is with no delay, expecting it to be succeeded
+ rpcService.setConnectionDelayMillis(0);
+ CompletableFuture<RegistrationResponse> secondFuture =
+ rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+ RegistrationResponse response = secondFuture.get();
+ assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+ // on success, send slot report for taskmanager registration
+ final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceID, 0), ResourceProfile.UNKNOWN));
+ rmGateway.sendSlotReport(taskExecutorResourceID,
+ ((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
+
+ // wait enough for the first registration's connection delay to be over letting its remaining part go through
+ Thread.sleep(additionalDelayMillis * 2);
+
+ // verify that the latest registration is valid not being unregistered by the delayed one
+ final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
+ taskExecutorResourceID,
+ TIMEOUT).get();
+ assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
+ assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
+ } finally {
+ rpcService.setConnectionDelayMillis(0L);
+ }
+ }
+
+ /**
* Tests that a TaskExecutor can disconnect from the {@link ResourceManager}.
*/
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index f42f09c..6d12266 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
@@ -55,6 +56,9 @@ public class TestingRpcService extends AkkaRpcService {
/** Map of pre-registered connections. */
private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
+ /** Artificial delay on connection */
+ private long connectionDelayMillis;
+
/**
* Creates a new {@code TestingRpcService}.
*/
@@ -99,6 +103,21 @@ public class TestingRpcService extends AkkaRpcService {
}
}
+ private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
+ if (connectionDelayMillis <= 0) {
+ return CompletableFuture.completedFuture(gateway);
+ } else {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Thread.sleep(connectionDelayMillis);
+ } catch (InterruptedException ignored) {}
+ return gateway;
+ },
+ TestingUtils.defaultExecutor());
+ }
+ }
+
@Override
public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
RpcGateway gateway = registeredConnections.get(address);
@@ -107,7 +126,7 @@ public class TestingRpcService extends AkkaRpcService {
if (clazz.isAssignableFrom(gateway.getClass())) {
@SuppressWarnings("unchecked")
C typedGateway = (C) gateway;
- return CompletableFuture.completedFuture(typedGateway);
+ return getRpcGatewayFuture(typedGateway);
} else {
return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
}
@@ -127,7 +146,7 @@ public class TestingRpcService extends AkkaRpcService {
if (clazz.isAssignableFrom(gateway.getClass())) {
@SuppressWarnings("unchecked")
C typedGateway = (C) gateway;
- return CompletableFuture.completedFuture(typedGateway);
+ return getRpcGatewayFuture(typedGateway);
} else {
return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
}
@@ -139,4 +158,8 @@ public class TestingRpcService extends AkkaRpcService {
public void clearGateways() {
registeredConnections.clear();
}
+
+ public void setConnectionDelayMillis(long connectionDelayMillis) {
+ this.connectionDelayMillis = connectionDelayMillis;
+ }
}