You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/14 09:18:55 UTC
[flink] 01/02: [FLINK-12260] Slot allocation failure by taskmanager
registration timeout and race
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bcd35b9b51e96b231bc64d3b45583bfcf47c3d18
Author: Hwanju Kim <hw...@amazon.com>
AuthorDate: Thu May 9 17:03:37 2019 -0700
[FLINK-12260] Slot allocation failure by taskmanager registration timeout and race
TaskExecutor registration has asynchronous process, which allows a next
retry after timeout to be processed first ahead of earlier request. Such
delayed timed-out request can accidently unregister a valid task
manager, whose slots are permanently not reported to job manager. This
patch introduces ongoing task executor futures to prevent such race.
---
.../runtime/resourcemanager/ResourceManager.java | 29 +++++---
.../ResourceManagerTaskExecutorTest.java | 85 ++++++++++++++++++----
.../flink/runtime/rpc/TestingRpcService.java | 33 +++++++--
3 files changed, 116 insertions(+), 31 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f90f63a..eca7f91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -117,6 +117,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
/** All currently registered TaskExecutors with there framework specific worker information. */
private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
+ /** Ongoing registration of TaskExecutors per resource ID. */
+ private final Map<ResourceID, CompletableFuture<TaskExecutorGateway>> taskExecutorGatewayFutures;
+
/** High availability services for leader retrieval and election. */
private final HighAvailabilityServices highAvailabilityServices;
@@ -193,10 +196,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
this.jmResourceIdRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
infoMessageListeners = new ConcurrentHashMap<>(8);
+ this.taskExecutorGatewayFutures = new HashMap<>(8);
}
-
-
// ------------------------------------------------------------------------
// RPC lifecycle methods
// ------------------------------------------------------------------------
@@ -356,18 +358,25 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
final Time timeout) {
CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+ taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture);
return taskExecutorGatewayFuture.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
- if (throwable != null) {
- return new RegistrationResponse.Decline(throwable.getMessage());
+ if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
+ taskExecutorGatewayFutures.remove(taskExecutorResourceId);
+ if (throwable != null) {
+ return new RegistrationResponse.Decline(throwable.getMessage());
+ } else {
+ return registerTaskExecutorInternal(
+ taskExecutorGateway,
+ taskExecutorAddress,
+ taskExecutorResourceId,
+ dataPort,
+ hardwareDescription);
+ }
} else {
- return registerTaskExecutorInternal(
- taskExecutorGateway,
- taskExecutorAddress,
- taskExecutorResourceId,
- dataPort,
- hardwareDescription);
+ log.info("Ignoring outdated TaskExecutorGateway connection.");
+ return new RegistrationResponse.Decline("Decline outdated task executor registration.");
}
},
getMainThreadExecutor());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index c3878ff..d1532a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -37,12 +39,15 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
+import akka.pattern.AskTimeoutException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -62,9 +67,11 @@ import static org.mockito.Mockito.when;
public class ResourceManagerTaskExecutorTest extends TestLogger {
- private final Time timeout = Time.seconds(10L);
+ private static final Time TIMEOUT = Time.seconds(10L);
- private TestingRpcService rpcService;
+ private static final long HEARTBEAT_TIMEOUT = 5000;
+
+ private static TestingRpcService rpcService;
private int dataPort = 1234;
@@ -95,14 +102,14 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
resourceManager = createAndStartResourceManager(rmLeaderElectionService, testingFatalErrorHandler);
rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
wronglyFencedGateway = rpcService.connect(resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class)
- .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
- grantLeadership(rmLeaderElectionService).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ grantLeadership(rmLeaderElectionService).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
}
@After
public void teardown() throws Exception {
- RpcUtils.terminateRpcService(rpcService, timeout);
+ RpcUtils.terminateRpcService(rpcService, TIMEOUT);
}
/**
@@ -114,17 +121,17 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
try {
// test response successful
CompletableFuture<RegistrationResponse> successfulFuture =
- rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
- RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+ RegistrationResponse response = successfulFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
taskExecutorResourceID,
- timeout).get();
+ TIMEOUT).get();
assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
CompletableFuture<RegistrationResponse> duplicateFuture =
- rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
+ rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
RegistrationResponse duplicateResponse = duplicateFuture.get();
assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
@@ -136,17 +143,17 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
}
/**
- * Test receive registration with unmatched leadershipId from task executor
+ * Test receive registration with unmatched leadershipId from task executor.
*/
@Test
public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
try {
// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
- wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
+ wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
try {
- unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ unMatchedLeaderFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
fail("Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
@@ -159,7 +166,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
}
/**
- * Test receive registration with invalid address from task executor
+ * Test receive registration with invalid address from task executor.
*/
@Test
public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
@@ -167,8 +174,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
// test throw exception when receive a registration from taskExecutor which takes invalid address
String invalidAddress = "/taskExecutor2";
CompletableFuture<RegistrationResponse> invalidAddressFuture =
- rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
- assertTrue(invalidAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
+ rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+ assertTrue(invalidAddressFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
} finally {
if (testingFatalErrorHandler.hasExceptionOccurred()) {
testingFatalErrorHandler.rethrowError();
@@ -187,7 +194,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
private StandaloneResourceManager createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
- HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
+ HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, HEARTBEAT_TIMEOUT);
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
SlotManager slotManager = SlotManagerBuilder.newBuilder()
@@ -224,4 +231,50 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
return leaderElectionService.isLeader(leaderSessionId);
}
+ /**
+ * Test delayed registration of task executor where the delay is introduced during connection from resource manager
+ * to the registering task executor.
+ */
+ @Test
+ public void testDelayedRegisterTaskExecutor() throws Exception {
+ // additional delay over RPC timeout
+ // use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below
+ final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+ try {
+ // first registration is with connection delay longer than timeout expecting timeout and then retry
+ rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis);
+ CompletableFuture<RegistrationResponse> firstFuture =
+ rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+ try {
+ firstFuture.get();
+ fail("Should have failed because connection to taskmanager is delayed beyond timeout");
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ }
+
+ // second registration after timeout is with no delay, expecting it to be succeeded
+ rpcService.setConnectionDelayMillis(0);
+ CompletableFuture<RegistrationResponse> secondFuture =
+ rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+ RegistrationResponse response = secondFuture.get();
+ assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+ // on success, send slot report for taskmanager registration
+ final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceID, 0), ResourceProfile.UNKNOWN));
+ rmGateway.sendSlotReport(taskExecutorResourceID,
+ ((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
+
+ // wait enough for the first registration's connection delay to be over letting its remaining part go through
+ Thread.sleep(additionalDelayMillis * 2);
+
+ // verify that the latest registration is valid not being unregistered by the delayed one
+ final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
+ taskExecutorResourceID,
+ TIMEOUT).get();
+ assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
+ assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
+ } finally {
+ rpcService.setConnectionDelayMillis(0L);
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index db70a0f..4f1ae66a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
@@ -52,18 +53,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class TestingRpcService extends AkkaRpcService {
- /** Map of pre-registered connections */
+ /** Map of pre-registered connections. */
private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
+ /** Artificial delay on connection. */
+ private long connectionDelayMillis;
+
/**
- * Creates a new {@code TestingRpcService}.
+ * Creates a new {@code TestingRpcService}.
*/
public TestingRpcService() {
this(new Configuration());
}
/**
- * Creates a new {@code TestingRpcService}, using the given configuration.
+ * Creates a new {@code TestingRpcService}, using the given configuration.
*/
public TestingRpcService(Configuration configuration) {
super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
@@ -98,6 +102,21 @@ public class TestingRpcService extends AkkaRpcService {
}
}
+ private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
+ if (connectionDelayMillis <= 0) {
+ return CompletableFuture.completedFuture(gateway);
+ } else {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Thread.sleep(connectionDelayMillis);
+ } catch (InterruptedException ignored) {}
+ return gateway;
+ },
+ TestingUtils.defaultExecutor());
+ }
+ }
+
@Override
public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
RpcGateway gateway = registeredConnections.get(address);
@@ -106,7 +125,7 @@ public class TestingRpcService extends AkkaRpcService {
if (clazz.isAssignableFrom(gateway.getClass())) {
@SuppressWarnings("unchecked")
C typedGateway = (C) gateway;
- return CompletableFuture.completedFuture(typedGateway);
+ return getRpcGatewayFuture(typedGateway);
} else {
return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
}
@@ -126,7 +145,7 @@ public class TestingRpcService extends AkkaRpcService {
if (clazz.isAssignableFrom(gateway.getClass())) {
@SuppressWarnings("unchecked")
C typedGateway = (C) gateway;
- return CompletableFuture.completedFuture(typedGateway);
+ return getRpcGatewayFuture(typedGateway);
} else {
return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
}
@@ -138,4 +157,8 @@ public class TestingRpcService extends AkkaRpcService {
public void clearGateways() {
registeredConnections.clear();
}
+
+ public void setConnectionDelayMillis(long connectionDelayMillis) {
+ this.connectionDelayMillis = connectionDelayMillis;
+ }
}