You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/18 14:20:25 UTC

[flink] 01/02: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cee8a38c7cb72a41c6d9ff5a128a279721225fe9
Author: blueszheng <ki...@163.com>
AuthorDate: Wed Feb 20 02:08:30 2019 +0800

    [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
    
    This closes #9072.
    This closes #7757.
---
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  65 ++++++-
 .../runtime/taskexecutor/TaskExecutorTest.java     | 209 ++++++++++++++++++++-
 2 files changed, 258 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9b295dd..621ef68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -132,6 +132,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -182,6 +184,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	/** The kvState registration service in the task manager. */
 	private final KvStateService kvStateService;
 
+	private final TaskCompletionTracker taskCompletionTracker;
+
 	// --------- job manager connections -----------
 
 	private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
@@ -273,6 +277,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		this.currentRegistrationTimeoutId = null;
 
 		this.stackTraceSampleService = new StackTraceSampleService(rpcService.getScheduledExecutor());
+		this.taskCompletionTracker = new TaskCompletionTracker();
 	}
 
 	@Override
@@ -333,31 +338,46 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	public CompletableFuture<Void> onStop() {
 		log.info("Stopping TaskExecutor {}.", getAddress());
 
-		Throwable throwable = null;
+		Throwable jobManagerDisconnectThrowable = null;
 
 		if (resourceManagerConnection != null) {
 			resourceManagerConnection.close();
 		}
 
+		FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
 		for (JobManagerConnection jobManagerConnection : jobManagerConnections.values()) {
 			try {
-				disassociateFromJobManager(jobManagerConnection, new FlinkException("The TaskExecutor is shutting down."));
+				disassociateFromJobManager(jobManagerConnection, cause);
 			} catch (Throwable t) {
-				throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
+				jobManagerDisconnectThrowable = ExceptionUtils.firstOrSuppressed(t, jobManagerDisconnectThrowable);
 			}
 		}
 
-		try {
-			stopTaskExecutorServices();
-		} catch (Exception e) {
-			throwable = ExceptionUtils.firstOrSuppressed(e, throwable);
+		final Throwable throwableBeforeTasksCompletion = jobManagerDisconnectThrowable;
+
+		return FutureUtils
+			.runAfterwards(
+				taskCompletionTracker.failIncompleteTasksAndGetTerminationFuture(),
+				this::stopTaskExecutorServices)
+  		    .handle((ignored, throwable) -> {
+  		    	handleOnStopException(throwableBeforeTasksCompletion, throwable);
+  		    	return null;
+			});
+	}
+
+	private void handleOnStopException(Throwable throwableBeforeTasksCompletion, Throwable throwableAfterTasksCompletion) {
+		final Throwable throwable;
+
+		if (throwableBeforeTasksCompletion != null) {
+			throwable = ExceptionUtils.firstOrSuppressed(throwableBeforeTasksCompletion, throwableAfterTasksCompletion);
+		} else {
+			throwable = throwableAfterTasksCompletion;
 		}
 
 		if (throwable != null) {
-			return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
+			throw new CompletionException(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
 		} else {
 			log.info("Stopped TaskExecutor {}.", getAddress());
-			return CompletableFuture.completedFuture(null);
 		}
 	}
 
@@ -596,6 +616,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 			if (taskAdded) {
 				task.startTaskThread();
+				taskCompletionTracker.trackTaskCompletion(task);
 
 				setupResultPartitionBookkeeping(tdd, task.getTerminationFuture());
 				return CompletableFuture.completedFuture(Acknowledge.get());
@@ -1826,4 +1847,30 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			return taskSlotTable.createSlotReport(getResourceID());
 		}
 	}
+
+	private static class TaskCompletionTracker {
+		private final Map<ExecutionAttemptID, Task> incompleteTasks;
+
+		private TaskCompletionTracker() {
+			incompleteTasks = new ConcurrentHashMap<>(8);
+		}
+
+		void trackTaskCompletion(Task task) {
+			incompleteTasks.put(task.getExecutionId(), task);
+			task.getTerminationFuture().thenRun(() -> incompleteTasks.remove(task.getExecutionId()));
+		}
+
+		CompletableFuture<Void> failIncompleteTasksAndGetTerminationFuture() {
+			FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
+			return FutureUtils.waitForAll(
+				incompleteTasks
+					.values()
+					.stream()
+					.map(task -> {
+						task.failExternally(cause);
+						return task.getTerminationFuture();
+					})
+					.collect(Collectors.toList()));
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index c7b8e12..bc6b296 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -40,9 +40,12 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
@@ -77,6 +80,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RegistrationResponse.Decline;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -85,9 +89,11 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
+import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment.Builder;
 import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
 import org.apache.flink.runtime.taskexecutor.partition.PartitionTable;
+import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -99,6 +105,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -139,6 +146,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -760,6 +768,141 @@ public class TaskExecutorTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testTaskInterruptionAndTerminationOnShutdown() throws Exception {
+		final JobMasterId jobMasterId = JobMasterId.generate();
+		final AllocationID allocationId = new AllocationID();
+		final TaskDeploymentDescriptor taskDeploymentDescriptor =
+			createTaskDeploymentDescriptor(TestInterruptableInvokable.class, allocationId);
+
+		final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId);
+		final TaskExecutor taskExecutor = createTaskExecutorWithJobManagerTable(jobManagerTable);
+
+		try {
+			taskExecutor.start();
+
+			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+			final JobMasterGateway jobMasterGateway = jobManagerTable.get(jobId).getJobManagerGateway();
+			requestSlotFromTaskExecutor(taskExecutorGateway, jobMasterGateway, allocationId);
+
+			taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterId, timeout);
+
+			TestInterruptableInvokable.STARTED_FUTURE.get();
+		} finally {
+			taskExecutor.closeAsync();
+		}
+
+		// check task has been interrupted
+		TestInterruptableInvokable.INTERRUPTED_FUTURE.get();
+
+		// check task executor is waiting for the task completion and has not terminated yet
+		final CompletableFuture<Void> taskExecutorTerminationFuture = taskExecutor.getTerminationFuture();
+		assertThat(taskExecutorTerminationFuture.isDone(), is(false));
+
+		// check task executor has exited after task completion
+		TestInterruptableInvokable.DONE_FUTURE.complete(null);
+		taskExecutorTerminationFuture.get();
+	}
+
+	private void requestSlotFromTaskExecutor(
+			TaskExecutorGateway taskExecutorGateway,
+			JobMasterGateway jobMasterGateway,
+			AllocationID allocationId) throws ExecutionException, InterruptedException {
+		final CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture =
+			new CompletableFuture<>();
+		ResourceManagerId resourceManagerId = createAndRegisterResourceManager(initialSlotReportFuture);
+		initialSlotReportFuture.get();
+
+		taskExecutorGateway
+			.requestSlot(
+				new SlotID(ResourceID.generate(), 0),
+				jobId,
+				allocationId,
+				jobMasterGateway.getAddress(),
+				resourceManagerId,
+				timeout)
+			.get();
+
+		// now inform the task manager about the new job leader
+		jobManagerLeaderRetriever.notifyListener(
+			jobMasterGateway.getAddress(),
+			jobMasterGateway.getFencingToken().toUUID());
+	}
+
+	private ResourceManagerId createAndRegisterResourceManager(
+			CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture) {
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
+			initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3);
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		});
+		rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
+
+		// tell the task manager about the rm leader
+		resourceManagerLeaderRetriever.notifyListener(
+			resourceManagerGateway.getAddress(),
+			resourceManagerGateway.getFencingToken().toUUID());
+
+		return resourceManagerGateway.getFencingToken();
+	}
+
+	private TaskExecutor createTaskExecutorWithJobManagerTable(JobManagerTable jobManagerTable) throws IOException {
+		final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
+		return createTaskExecutor(new TaskManagerServicesBuilder()
+			.setTaskSlotTable(new TaskSlotTable(Collections.singletonList(ResourceProfile.UNKNOWN), timerService))
+			.setJobManagerTable(jobManagerTable)
+			.setTaskStateManager(localStateStoresManager)
+			.build());
+	}
+
+	private JobManagerTable createJobManagerTableWithOneJob(JobMasterId jobMasterId) {
+		final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
+			.setFencingTokenSupplier(() -> jobMasterId)
+			.setOfferSlotsFunction((resourceID, slotOffers) -> CompletableFuture.completedFuture(slotOffers))
+			.build();
+		rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
+
+		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
+			jobId,
+			ResourceID.generate(),
+			jobMasterGateway,
+			new NoOpTaskManagerActions(),
+			new TestCheckpointResponder(),
+			new TestGlobalAggregateManager(),
+			ContextClassLoaderLibraryCacheManager.INSTANCE,
+			new NoOpResultPartitionConsumableNotifier(),
+			(j, i, r) -> CompletableFuture.completedFuture(null));
+
+		final JobManagerTable jobManagerTable = new JobManagerTable();
+		jobManagerTable.put(jobId, jobManagerConnection);
+		return jobManagerTable;
+	}
+
+	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+		final Class<? extends AbstractInvokable> invokableClass,
+		final AllocationID allocationId) throws IOException {
+		final TaskInformation taskInformation = new TaskInformation(
+			new JobVertexID(),
+			"test task",
+			1,
+			1,
+			invokableClass.getName(),
+			new Configuration());
+
+		return new TaskDeploymentDescriptor(
+			jobId,
+			new NonOffloaded<>(new SerializedValue<>(new DummyJobInformation(jobId, testName.getMethodName()))),
+			new NonOffloaded<>(new SerializedValue<>(taskInformation)),
+			new ExecutionAttemptID(),
+			allocationId,
+			0,
+			0,
+			0,
+			null,
+			Collections.emptyList(),
+			Collections.emptyList());
+	}
+
 	/**
 	 * Tests that a TaskManager detects a job leader for which it has reserved slots. Upon detecting
 	 * the job leader, it will offer all reserved slots to the JobManager.
@@ -1017,8 +1160,8 @@ public class TaskExecutorTest extends TestLogger {
 
 			final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
 				jobId,
-				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
-				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
+				new NonOffloaded<>(serializedJobInformation),
+				new NonOffloaded<>(serializedJobVertexInformation),
 				new ExecutionAttemptID(),
 				allocationId1,
 				0,
@@ -1284,7 +1427,7 @@ public class TaskExecutorTest extends TestLogger {
 							new ClusterInformation("localhost", 1234)));
 					} else {
 						secondRegistration.trigger();
-						return CompletableFuture.completedFuture(new RegistrationResponse.Decline("Only the first registration should succeed."));
+						return CompletableFuture.completedFuture(new Decline("Only the first registration should succeed."));
 					}
 				}
 			);
@@ -1642,7 +1785,7 @@ public class TaskExecutorTest extends TestLogger {
 		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
 
 		try (TaskSubmissionTestEnvironment env =
-			new TaskSubmissionTestEnvironment.Builder(jobId)
+			new Builder(jobId)
 				.setConfiguration(config)
 				.setLocalCommunication(false)
 				.build()) {
@@ -1659,7 +1802,7 @@ public class TaskExecutorTest extends TestLogger {
 
 	@Test(timeout = 10000L)
 	public void testTerminationOnFatalError() throws Throwable {
-		try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(jobId).build()) {
+		try (TaskSubmissionTestEnvironment env = new Builder(jobId).build()) {
 			String testExceptionMsg = "Test exception of fatal error.";
 
 			env.getTaskExecutor().onFatalError(new Exception(testExceptionMsg));
@@ -1901,12 +2044,12 @@ public class TaskExecutorTest extends TestLogger {
 		}
 
 		@Override
-		public void start(LeaderRetrievalListener listener) throws Exception {
+		public void start(LeaderRetrievalListener listener) {
 			startFuture.complete(listener);
 		}
 
 		@Override
-		public void stop() throws Exception {
+		public void stop() {
 			stopFuture.complete(null);
 		}
 	}
@@ -2038,4 +2181,56 @@ public class TaskExecutorTest extends TestLogger {
 			return result;
 		}
 	}
+
+	/**
+	 * Test invokable which completes the given future when interrupted (can be used only once).
+	 */
+	public static class TestInterruptableInvokable extends AbstractInvokable {
+		private static final CompletableFuture<Void> INTERRUPTED_FUTURE = new CompletableFuture<>();
+		private static final CompletableFuture<Void> STARTED_FUTURE = new CompletableFuture<>();
+		private static final CompletableFuture<Void> DONE_FUTURE = new CompletableFuture<>();
+
+		public TestInterruptableInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() {
+			STARTED_FUTURE.complete(null);
+
+			try {
+				INTERRUPTED_FUTURE.get();
+			} catch (InterruptedException e) {
+				INTERRUPTED_FUTURE.complete(null);
+			} catch (ExecutionException e) {
+				ExceptionUtils.rethrow(e);
+			}
+
+			try {
+				DONE_FUTURE.get();
+			} catch (ExecutionException | InterruptedException e) {
+				ExceptionUtils.rethrow(e);
+			}
+		}
+	}
+
+	/**
+	 * {@link TaskSlotTable} which completes the given future when it is started.
+	 */
+	private static class TaskSlotTableWithStartFuture extends TaskSlotTable {
+		private final CompletableFuture<Void> taskSlotTableStarted;
+
+		private TaskSlotTableWithStartFuture(
+				CompletableFuture<Void> taskSlotTableStarted,
+				TimerService<AllocationID> timerService) {
+			super(Collections.singletonList(ResourceProfile.UNKNOWN), timerService);
+			this.taskSlotTableStarted = taskSlotTableStarted;
+		}
+
+		@Override
+		public void start(SlotActions initialSlotActions) {
+			super.start(initialSlotActions);
+			taskSlotTableStarted.complete(null);
+		}
+	}
 }