You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/14 09:18:55 UTC

[flink] 01/02: [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bcd35b9b51e96b231bc64d3b45583bfcf47c3d18
Author: Hwanju Kim <hw...@amazon.com>
AuthorDate: Thu May 9 17:03:37 2019 -0700

    [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race
    
    TaskExecutor registration has asynchronous process, which allows a next
    retry after timeout to be processed first ahead of earlier request. Such
    delayed timed-out request can accidently unregister a valid task
    manager, whose slots are permanently not reported to job manager. This
    patch introduces ongoing task executor futures to prevent such race.
---
 .../runtime/resourcemanager/ResourceManager.java   | 29 +++++---
 .../ResourceManagerTaskExecutorTest.java           | 85 ++++++++++++++++++----
 .../flink/runtime/rpc/TestingRpcService.java       | 33 +++++++--
 3 files changed, 116 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f90f63a..eca7f91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -117,6 +117,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	/** All currently registered TaskExecutors with there framework specific worker information. */
 	private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
 
+	/** Ongoing registration of TaskExecutors per resource ID. */
+	private final Map<ResourceID, CompletableFuture<TaskExecutorGateway>> taskExecutorGatewayFutures;
+
 	/** High availability services for leader retrieval and election. */
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -193,10 +196,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		this.jmResourceIdRegistrations = new HashMap<>(4);
 		this.taskExecutors = new HashMap<>(8);
 		infoMessageListeners = new ConcurrentHashMap<>(8);
+		this.taskExecutorGatewayFutures = new HashMap<>(8);
 	}
 
-
-
 	// ------------------------------------------------------------------------
 	//  RPC lifecycle methods
 	// ------------------------------------------------------------------------
@@ -356,18 +358,25 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			final Time timeout) {
 
 		CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+		taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture);
 
 		return taskExecutorGatewayFuture.handleAsync(
 			(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
-				if (throwable != null) {
-					return new RegistrationResponse.Decline(throwable.getMessage());
+				if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
+					taskExecutorGatewayFutures.remove(taskExecutorResourceId);
+					if (throwable != null) {
+						return new RegistrationResponse.Decline(throwable.getMessage());
+					} else {
+						return registerTaskExecutorInternal(
+							taskExecutorGateway,
+							taskExecutorAddress,
+							taskExecutorResourceId,
+							dataPort,
+							hardwareDescription);
+					}
 				} else {
-					return registerTaskExecutorInternal(
-						taskExecutorGateway,
-						taskExecutorAddress,
-						taskExecutorResourceId,
-						dataPort,
-						hardwareDescription);
+					log.info("Ignoring outdated TaskExecutorGateway connection.");
+					return new RegistrationResponse.Decline("Decline outdated task executor registration.");
 				}
 			},
 			getMainThreadExecutor());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index c3878ff..d1532a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -37,12 +39,15 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
+import akka.pattern.AskTimeoutException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -62,9 +67,11 @@ import static org.mockito.Mockito.when;
 
 public class ResourceManagerTaskExecutorTest extends TestLogger {
 
-	private final Time timeout = Time.seconds(10L);
+	private static final Time TIMEOUT = Time.seconds(10L);
 
-	private TestingRpcService rpcService;
+	private static final long HEARTBEAT_TIMEOUT = 5000;
+
+	private static TestingRpcService rpcService;
 
 	private int dataPort = 1234;
 
@@ -95,14 +102,14 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		resourceManager = createAndStartResourceManager(rmLeaderElectionService, testingFatalErrorHandler);
 		rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 		wronglyFencedGateway = rpcService.connect(resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class)
-			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-		grantLeadership(rmLeaderElectionService).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		grantLeadership(rmLeaderElectionService).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 	}
 
 	@After
 	public void teardown() throws Exception {
-		RpcUtils.terminateRpcService(rpcService, timeout);
+		RpcUtils.terminateRpcService(rpcService, TIMEOUT);
 	}
 
 	/**
@@ -114,17 +121,17 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test response successful
 			CompletableFuture<RegistrationResponse> successfulFuture =
-				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
-			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			RegistrationResponse response = successfulFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
 				taskExecutorResourceID,
-				timeout).get();
+				TIMEOUT).get();
 			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
 			CompletableFuture<RegistrationResponse> duplicateFuture =
-				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
 			RegistrationResponse duplicateResponse = duplicateFuture.get();
 			assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
 			assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
@@ -136,17 +143,17 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	}
 
 	/**
-	 * Test receive registration with unmatched leadershipId from task executor
+	 * Test receive registration with unmatched leadershipId from task executor.
 	 */
 	@Test
 	public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
 		try {
 			// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
 			CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
-				wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
+				wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
 
 			try {
-				unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+				unMatchedLeaderFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 				fail("Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
 			} catch (ExecutionException e) {
 				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
@@ -159,7 +166,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	}
 
 	/**
-	 * Test receive registration with invalid address from task executor
+	 * Test receive registration with invalid address from task executor.
 	 */
 	@Test
 	public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
@@ -167,8 +174,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			// test throw exception when receive a registration from taskExecutor which takes invalid address
 			String invalidAddress = "/taskExecutor2";
 			CompletableFuture<RegistrationResponse> invalidAddressFuture =
-				rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
-			assertTrue(invalidAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
+				rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			assertTrue(invalidAddressFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {
 				testingFatalErrorHandler.rethrowError();
@@ -187,7 +194,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private StandaloneResourceManager createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
+		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, HEARTBEAT_TIMEOUT);
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 
 		SlotManager slotManager = SlotManagerBuilder.newBuilder()
@@ -224,4 +231,50 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		return leaderElectionService.isLeader(leaderSessionId);
 	}
 
+	/**
+	 * Test delayed registration of task executor where the delay is introduced during connection from resource manager
+	 * to the registering task executor.
+	 */
+	@Test
+	public void testDelayedRegisterTaskExecutor() throws Exception {
+		// additional delay over RPC timeout
+		// use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below
+		final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+		try {
+			// first registration is with connection delay longer than timeout expecting timeout and then retry
+			rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis);
+			CompletableFuture<RegistrationResponse> firstFuture =
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			try {
+				firstFuture.get();
+				fail("Should have failed because connection to taskmanager is delayed beyond timeout");
+			} catch (Exception e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			// second registration after timeout is with no delay, expecting it to be succeeded
+			rpcService.setConnectionDelayMillis(0);
+			CompletableFuture<RegistrationResponse> secondFuture =
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			RegistrationResponse response = secondFuture.get();
+			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+			// on success, send slot report for taskmanager registration
+			final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceID, 0), ResourceProfile.UNKNOWN));
+			rmGateway.sendSlotReport(taskExecutorResourceID,
+				((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
+
+			// wait enough for the first registration's connection delay to be over letting its remaining part go through
+			Thread.sleep(additionalDelayMillis * 2);
+
+			// verify that the latest registration is valid not being unregistered by the delayed one
+			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
+				taskExecutorResourceID,
+				TIMEOUT).get();
+			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
+			assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
+		} finally {
+			rpcService.setConnectionDelayMillis(0L);
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index db70a0f..4f1ae66a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
@@ -52,18 +53,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TestingRpcService extends AkkaRpcService {
 
-	/** Map of pre-registered connections */
+	/** Map of pre-registered connections. */
 	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
 
+	/** Artificial delay on connection. */
+	private long connectionDelayMillis;
+
 	/**
-	 * Creates a new {@code TestingRpcService}. 
+	 * Creates a new {@code TestingRpcService}.
 	 */
 	public TestingRpcService() {
 		this(new Configuration());
 	}
 
 	/**
-	 * Creates a new {@code TestingRpcService}, using the given configuration. 
+	 * Creates a new {@code TestingRpcService}, using the given configuration.
 	 */
 	public TestingRpcService(Configuration configuration) {
 		super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
@@ -98,6 +102,21 @@ public class TestingRpcService extends AkkaRpcService {
 		}
 	}
 
+	private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
+		if (connectionDelayMillis <= 0) {
+			return CompletableFuture.completedFuture(gateway);
+		} else {
+			return CompletableFuture.supplyAsync(
+				() -> {
+					try {
+						Thread.sleep(connectionDelayMillis);
+					} catch (InterruptedException ignored) {}
+					return gateway;
+				},
+				TestingUtils.defaultExecutor());
+		}
+	}
+
 	@Override
 	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
 		RpcGateway gateway = registeredConnections.get(address);
@@ -106,7 +125,7 @@ public class TestingRpcService extends AkkaRpcService {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return CompletableFuture.completedFuture(typedGateway);
+				return getRpcGatewayFuture(typedGateway);
 			} else {
 				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
@@ -126,7 +145,7 @@ public class TestingRpcService extends AkkaRpcService {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return CompletableFuture.completedFuture(typedGateway);
+				return getRpcGatewayFuture(typedGateway);
 			} else {
 				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
@@ -138,4 +157,8 @@ public class TestingRpcService extends AkkaRpcService {
 	public void clearGateways() {
 		registeredConnections.clear();
 	}
+
+	public void setConnectionDelayMillis(long connectionDelayMillis) {
+		this.connectionDelayMillis = connectionDelayMillis;
+	}
 }