You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/18 14:20:25 UTC
[flink] 01/02: [FLINK-11630] Triggers the termination of all
running Tasks when shutting down TaskExecutor
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cee8a38c7cb72a41c6d9ff5a128a279721225fe9
Author: blueszheng <ki...@163.com>
AuthorDate: Wed Feb 20 02:08:30 2019 +0800
[FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
This closes #9072.
This closes #7757.
---
.../flink/runtime/taskexecutor/TaskExecutor.java | 65 ++++++-
.../runtime/taskexecutor/TaskExecutorTest.java | 209 ++++++++++++++++++++-
2 files changed, 258 insertions(+), 16 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9b295dd..621ef68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -132,6 +132,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -182,6 +184,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
/** The kvState registration service in the task manager. */
private final KvStateService kvStateService;
+ private final TaskCompletionTracker taskCompletionTracker;
+
// --------- job manager connections -----------
private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
@@ -273,6 +277,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
this.currentRegistrationTimeoutId = null;
this.stackTraceSampleService = new StackTraceSampleService(rpcService.getScheduledExecutor());
+ this.taskCompletionTracker = new TaskCompletionTracker();
}
@Override
@@ -333,31 +338,46 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
public CompletableFuture<Void> onStop() {
log.info("Stopping TaskExecutor {}.", getAddress());
- Throwable throwable = null;
+ Throwable jobManagerDisconnectThrowable = null;
if (resourceManagerConnection != null) {
resourceManagerConnection.close();
}
+ FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
for (JobManagerConnection jobManagerConnection : jobManagerConnections.values()) {
try {
- disassociateFromJobManager(jobManagerConnection, new FlinkException("The TaskExecutor is shutting down."));
+ disassociateFromJobManager(jobManagerConnection, cause);
} catch (Throwable t) {
- throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
+ jobManagerDisconnectThrowable = ExceptionUtils.firstOrSuppressed(t, jobManagerDisconnectThrowable);
}
}
- try {
- stopTaskExecutorServices();
- } catch (Exception e) {
- throwable = ExceptionUtils.firstOrSuppressed(e, throwable);
+ final Throwable throwableBeforeTasksCompletion = jobManagerDisconnectThrowable;
+
+ return FutureUtils
+ .runAfterwards(
+ taskCompletionTracker.failIncompleteTasksAndGetTerminationFuture(),
+ this::stopTaskExecutorServices)
+ .handle((ignored, throwable) -> {
+ handleOnStopException(throwableBeforeTasksCompletion, throwable);
+ return null;
+ });
+ }
+
+ private void handleOnStopException(Throwable throwableBeforeTasksCompletion, Throwable throwableAfterTasksCompletion) {
+ final Throwable throwable;
+
+ if (throwableBeforeTasksCompletion != null) {
+ throwable = ExceptionUtils.firstOrSuppressed(throwableBeforeTasksCompletion, throwableAfterTasksCompletion);
+ } else {
+ throwable = throwableAfterTasksCompletion;
}
if (throwable != null) {
- return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
+ throw new CompletionException(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
} else {
log.info("Stopped TaskExecutor {}.", getAddress());
- return CompletableFuture.completedFuture(null);
}
}
@@ -596,6 +616,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
if (taskAdded) {
task.startTaskThread();
+ taskCompletionTracker.trackTaskCompletion(task);
setupResultPartitionBookkeeping(tdd, task.getTerminationFuture());
return CompletableFuture.completedFuture(Acknowledge.get());
@@ -1826,4 +1847,30 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
return taskSlotTable.createSlotReport(getResourceID());
}
}
+
+ private static class TaskCompletionTracker {
+ private final Map<ExecutionAttemptID, Task> incompleteTasks;
+
+ private TaskCompletionTracker() {
+ incompleteTasks = new ConcurrentHashMap<>(8);
+ }
+
+ void trackTaskCompletion(Task task) {
+ incompleteTasks.put(task.getExecutionId(), task);
+ task.getTerminationFuture().thenRun(() -> incompleteTasks.remove(task.getExecutionId()));
+ }
+
+ CompletableFuture<Void> failIncompleteTasksAndGetTerminationFuture() {
+ FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
+ return FutureUtils.waitForAll(
+ incompleteTasks
+ .values()
+ .stream()
+ .map(task -> {
+ task.failExternally(cause);
+ return task.getTerminationFuture();
+ })
+ .collect(Collectors.toList()));
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index c7b8e12..bc6b296 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -40,9 +40,12 @@ import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
@@ -77,6 +80,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RegistrationResponse.Decline;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -85,9 +89,11 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
+import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment.Builder;
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.partition.PartitionTable;
+import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -99,6 +105,7 @@ import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -139,6 +146,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -760,6 +768,141 @@ public class TaskExecutorTest extends TestLogger {
}
}
+ @Test
+ public void testTaskInterruptionAndTerminationOnShutdown() throws Exception {
+ final JobMasterId jobMasterId = JobMasterId.generate();
+ final AllocationID allocationId = new AllocationID();
+ final TaskDeploymentDescriptor taskDeploymentDescriptor =
+ createTaskDeploymentDescriptor(TestInterruptableInvokable.class, allocationId);
+
+ final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId);
+ final TaskExecutor taskExecutor = createTaskExecutorWithJobManagerTable(jobManagerTable);
+
+ try {
+ taskExecutor.start();
+
+ final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+ final JobMasterGateway jobMasterGateway = jobManagerTable.get(jobId).getJobManagerGateway();
+ requestSlotFromTaskExecutor(taskExecutorGateway, jobMasterGateway, allocationId);
+
+ taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterId, timeout);
+
+ TestInterruptableInvokable.STARTED_FUTURE.get();
+ } finally {
+ taskExecutor.closeAsync();
+ }
+
+ // check task has been interrupted
+ TestInterruptableInvokable.INTERRUPTED_FUTURE.get();
+
+ // check task executor is waiting for the task completion and has not terminated yet
+ final CompletableFuture<Void> taskExecutorTerminationFuture = taskExecutor.getTerminationFuture();
+ assertThat(taskExecutorTerminationFuture.isDone(), is(false));
+
+ // check task executor has exited after task completion
+ TestInterruptableInvokable.DONE_FUTURE.complete(null);
+ taskExecutorTerminationFuture.get();
+ }
+
+ private void requestSlotFromTaskExecutor(
+ TaskExecutorGateway taskExecutorGateway,
+ JobMasterGateway jobMasterGateway,
+ AllocationID allocationId) throws ExecutionException, InterruptedException {
+ final CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture =
+ new CompletableFuture<>();
+ ResourceManagerId resourceManagerId = createAndRegisterResourceManager(initialSlotReportFuture);
+ initialSlotReportFuture.get();
+
+ taskExecutorGateway
+ .requestSlot(
+ new SlotID(ResourceID.generate(), 0),
+ jobId,
+ allocationId,
+ jobMasterGateway.getAddress(),
+ resourceManagerId,
+ timeout)
+ .get();
+
+ // now inform the task manager about the new job leader
+ jobManagerLeaderRetriever.notifyListener(
+ jobMasterGateway.getAddress(),
+ jobMasterGateway.getFencingToken().toUUID());
+ }
+
+ private ResourceManagerId createAndRegisterResourceManager(
+ CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture) {
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
+ initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ });
+ rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
+
+ // tell the task manager about the rm leader
+ resourceManagerLeaderRetriever.notifyListener(
+ resourceManagerGateway.getAddress(),
+ resourceManagerGateway.getFencingToken().toUUID());
+
+ return resourceManagerGateway.getFencingToken();
+ }
+
+ private TaskExecutor createTaskExecutorWithJobManagerTable(JobManagerTable jobManagerTable) throws IOException {
+ final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
+ return createTaskExecutor(new TaskManagerServicesBuilder()
+ .setTaskSlotTable(new TaskSlotTable(Collections.singletonList(ResourceProfile.UNKNOWN), timerService))
+ .setJobManagerTable(jobManagerTable)
+ .setTaskStateManager(localStateStoresManager)
+ .build());
+ }
+
+ private JobManagerTable createJobManagerTableWithOneJob(JobMasterId jobMasterId) {
+ final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
+ .setFencingTokenSupplier(() -> jobMasterId)
+ .setOfferSlotsFunction((resourceID, slotOffers) -> CompletableFuture.completedFuture(slotOffers))
+ .build();
+ rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
+
+ final JobManagerConnection jobManagerConnection = new JobManagerConnection(
+ jobId,
+ ResourceID.generate(),
+ jobMasterGateway,
+ new NoOpTaskManagerActions(),
+ new TestCheckpointResponder(),
+ new TestGlobalAggregateManager(),
+ ContextClassLoaderLibraryCacheManager.INSTANCE,
+ new NoOpResultPartitionConsumableNotifier(),
+ (j, i, r) -> CompletableFuture.completedFuture(null));
+
+ final JobManagerTable jobManagerTable = new JobManagerTable();
+ jobManagerTable.put(jobId, jobManagerConnection);
+ return jobManagerTable;
+ }
+
+ private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+ final Class<? extends AbstractInvokable> invokableClass,
+ final AllocationID allocationId) throws IOException {
+ final TaskInformation taskInformation = new TaskInformation(
+ new JobVertexID(),
+ "test task",
+ 1,
+ 1,
+ invokableClass.getName(),
+ new Configuration());
+
+ return new TaskDeploymentDescriptor(
+ jobId,
+ new NonOffloaded<>(new SerializedValue<>(new DummyJobInformation(jobId, testName.getMethodName()))),
+ new NonOffloaded<>(new SerializedValue<>(taskInformation)),
+ new ExecutionAttemptID(),
+ allocationId,
+ 0,
+ 0,
+ 0,
+ null,
+ Collections.emptyList(),
+ Collections.emptyList());
+ }
+
/**
* Tests that a TaskManager detects a job leader for which it has reserved slots. Upon detecting
* the job leader, it will offer all reserved slots to the JobManager.
@@ -1017,8 +1160,8 @@ public class TaskExecutorTest extends TestLogger {
final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
jobId,
- new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
- new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
+ new NonOffloaded<>(serializedJobInformation),
+ new NonOffloaded<>(serializedJobVertexInformation),
new ExecutionAttemptID(),
allocationId1,
0,
@@ -1284,7 +1427,7 @@ public class TaskExecutorTest extends TestLogger {
new ClusterInformation("localhost", 1234)));
} else {
secondRegistration.trigger();
- return CompletableFuture.completedFuture(new RegistrationResponse.Decline("Only the first registration should succeed."));
+ return CompletableFuture.completedFuture(new Decline("Only the first registration should succeed."));
}
}
);
@@ -1642,7 +1785,7 @@ public class TaskExecutorTest extends TestLogger {
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
try (TaskSubmissionTestEnvironment env =
- new TaskSubmissionTestEnvironment.Builder(jobId)
+ new Builder(jobId)
.setConfiguration(config)
.setLocalCommunication(false)
.build()) {
@@ -1659,7 +1802,7 @@ public class TaskExecutorTest extends TestLogger {
@Test(timeout = 10000L)
public void testTerminationOnFatalError() throws Throwable {
- try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(jobId).build()) {
+ try (TaskSubmissionTestEnvironment env = new Builder(jobId).build()) {
String testExceptionMsg = "Test exception of fatal error.";
env.getTaskExecutor().onFatalError(new Exception(testExceptionMsg));
@@ -1901,12 +2044,12 @@ public class TaskExecutorTest extends TestLogger {
}
@Override
- public void start(LeaderRetrievalListener listener) throws Exception {
+ public void start(LeaderRetrievalListener listener) {
startFuture.complete(listener);
}
@Override
- public void stop() throws Exception {
+ public void stop() {
stopFuture.complete(null);
}
}
@@ -2038,4 +2181,56 @@ public class TaskExecutorTest extends TestLogger {
return result;
}
}
+
+ /**
+ * Test invokable which completes the given future when interrupted (can be used only once).
+ */
+ public static class TestInterruptableInvokable extends AbstractInvokable {
+ private static final CompletableFuture<Void> INTERRUPTED_FUTURE = new CompletableFuture<>();
+ private static final CompletableFuture<Void> STARTED_FUTURE = new CompletableFuture<>();
+ private static final CompletableFuture<Void> DONE_FUTURE = new CompletableFuture<>();
+
+ public TestInterruptableInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() {
+ STARTED_FUTURE.complete(null);
+
+ try {
+ INTERRUPTED_FUTURE.get();
+ } catch (InterruptedException e) {
+ INTERRUPTED_FUTURE.complete(null);
+ } catch (ExecutionException e) {
+ ExceptionUtils.rethrow(e);
+ }
+
+ try {
+ DONE_FUTURE.get();
+ } catch (ExecutionException | InterruptedException e) {
+ ExceptionUtils.rethrow(e);
+ }
+ }
+ }
+
+ /**
+ * {@link TaskSlotTable} which completes the given future when it is started.
+ */
+ private static class TaskSlotTableWithStartFuture extends TaskSlotTable {
+ private final CompletableFuture<Void> taskSlotTableStarted;
+
+ private TaskSlotTableWithStartFuture(
+ CompletableFuture<Void> taskSlotTableStarted,
+ TimerService<AllocationID> timerService) {
+ super(Collections.singletonList(ResourceProfile.UNKNOWN), timerService);
+ this.taskSlotTableStarted = taskSlotTableStarted;
+ }
+
+ @Override
+ public void start(SlotActions initialSlotActions) {
+ super.start(initialSlotActions);
+ taskSlotTableStarted.complete(null);
+ }
+ }
}