You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/14 09:15:33 UTC

[flink] 04/05: [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07773d0d9251d6ad8c1770de985d33be8e72b032
Author: Hwanju Kim <hw...@amazon.com>
AuthorDate: Thu May 9 17:03:37 2019 -0700

    [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race
    
    TaskExecutor registration has asynchronous process, which allows a next
    retry after timeout to be processed first ahead of earlier request. Such
    delayed timed-out request can accidently unregister a valid task
    manager, whose slots are permanently not reported to job manager. This
    patch introduces ongoing task executor futures to prevent such race.
---
 .../runtime/resourcemanager/ResourceManager.java   | 27 +++++++----
 .../ResourceManagerTaskExecutorTest.java           | 52 +++++++++++++++++++++-
 .../flink/runtime/rpc/TestingRpcService.java       | 27 ++++++++++-
 3 files changed, 95 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 12860a8..03e1d87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -114,6 +114,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	/** All currently registered TaskExecutors with there framework specific worker information. */
 	private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
 
+	/** Ongoing registration of TaskExecutors per resource ID. */
+	private final Map<ResourceID, CompletableFuture<TaskExecutorGateway>> taskExecutorGatewayFutures;
+
 	/** High availability services for leader retrieval and election. */
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -186,6 +189,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		this.jobManagerRegistrations = new HashMap<>(4);
 		this.jmResourceIdRegistrations = new HashMap<>(4);
 		this.taskExecutors = new HashMap<>(8);
+		this.taskExecutorGatewayFutures = new HashMap<>(8);
 	}
 
 
@@ -371,18 +375,25 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			final Time timeout) {
 
 		CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+		taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture);
 
 		return taskExecutorGatewayFuture.handleAsync(
 			(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
-				if (throwable != null) {
-					return new RegistrationResponse.Decline(throwable.getMessage());
+				if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
+					taskExecutorGatewayFutures.remove(taskExecutorResourceId);
+					if (throwable != null) {
+						return new RegistrationResponse.Decline(throwable.getMessage());
+					} else {
+						return registerTaskExecutorInternal(
+							taskExecutorGateway,
+							taskExecutorAddress,
+							taskExecutorResourceId,
+							dataPort,
+							hardwareDescription);
+					}
 				} else {
-					return registerTaskExecutorInternal(
-						taskExecutorGateway,
-						taskExecutorAddress,
-						taskExecutorResourceId,
-						dataPort,
-						hardwareDescription);
+					log.info("Ignoring outdated TaskExecutorGateway connection.");
+					return new RegistrationResponse.Decline("Decline outdated task executor registration.");
 				}
 			},
 			getMainThreadExecutor());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 95b3d08..fcb92af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
+import akka.pattern.AskTimeoutException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -79,6 +80,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private static final Time TIMEOUT = Time.seconds(10L);
 
+	private static final long HEARTBEAT_TIMEOUT = 5000;
+
 	private static TestingRpcService rpcService;
 
 	private TestingTaskExecutorGateway taskExecutorGateway;
@@ -133,7 +136,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private StandaloneResourceManager createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
+		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, HEARTBEAT_TIMEOUT);
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 
 		SlotManager slotManager = SlotManagerBuilder.newBuilder()
@@ -209,6 +212,53 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	}
 
 	/**
+	 * Test delayed registration of task executor where the delay is introduced during connection from resource manager
+	 * to the registering task executor.
+	 */
+	@Test
+	public void testDelayedRegisterTaskExecutor() throws Exception {
+		// additional delay over RPC timeout
+		// use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below
+		final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+		try {
+			// first registration is with connection delay longer than timeout expecting timeout and then retry
+			rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis);
+			CompletableFuture<RegistrationResponse> firstFuture =
+				rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			try {
+				firstFuture.get();
+				fail("Should have failed because connection to taskmanager is delayed beyond timeout");
+			} catch (Exception e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			// second registration after timeout is with no delay, expecting it to be succeeded
+			rpcService.setConnectionDelayMillis(0);
+			CompletableFuture<RegistrationResponse> secondFuture =
+				rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			RegistrationResponse response = secondFuture.get();
+			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+			// on success, send slot report for taskmanager registration
+			final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceID, 0), ResourceProfile.UNKNOWN));
+			rmGateway.sendSlotReport(taskExecutorResourceID,
+				((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
+
+			// wait enough for the first registration's connection delay to be over letting its remaining part go through
+			Thread.sleep(additionalDelayMillis * 2);
+
+			// verify that the latest registration is valid not being unregistered by the delayed one
+			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
+				taskExecutorResourceID,
+				TIMEOUT).get();
+			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
+			assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
+		} finally {
+			rpcService.setConnectionDelayMillis(0L);
+		}
+	}
+
+	/**
 	 * Tests that a TaskExecutor can disconnect from the {@link ResourceManager}.
 	 */
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index f42f09c..6d12266 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
@@ -55,6 +56,9 @@ public class TestingRpcService extends AkkaRpcService {
 	/** Map of pre-registered connections. */
 	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
 
+	/** Artificial delay on connection */
+	private long connectionDelayMillis;
+
 	/**
 	 * Creates a new {@code TestingRpcService}.
 	 */
@@ -99,6 +103,21 @@ public class TestingRpcService extends AkkaRpcService {
 		}
 	}
 
+	private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
+		if (connectionDelayMillis <= 0) {
+			return CompletableFuture.completedFuture(gateway);
+		} else {
+			return CompletableFuture.supplyAsync(
+				() -> {
+					try {
+						Thread.sleep(connectionDelayMillis);
+					} catch (InterruptedException ignored) {}
+					return gateway;
+				},
+				TestingUtils.defaultExecutor());
+		}
+	}
+
 	@Override
 	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
 		RpcGateway gateway = registeredConnections.get(address);
@@ -107,7 +126,7 @@ public class TestingRpcService extends AkkaRpcService {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return CompletableFuture.completedFuture(typedGateway);
+				return getRpcGatewayFuture(typedGateway);
 			} else {
 				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
@@ -127,7 +146,7 @@ public class TestingRpcService extends AkkaRpcService {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return CompletableFuture.completedFuture(typedGateway);
+				return getRpcGatewayFuture(typedGateway);
 			} else {
 				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
@@ -139,4 +158,8 @@ public class TestingRpcService extends AkkaRpcService {
 	public void clearGateways() {
 		registeredConnections.clear();
 	}
+
+	public void setConnectionDelayMillis(long connectionDelayMillis) {
+		this.connectionDelayMillis = connectionDelayMillis;
+	}
 }