You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/19 11:42:40 UTC
[flink] branch release-1.9 updated: [FLINK-13752] Only references
necessary variables when bookkeeping result partitions on TM
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 51e50ab [FLINK-13752] Only references necessary variables when bookkeeping result partitions on TM
51e50ab is described below
commit 51e50aba23e9d62f66fa3274485d2bad05478841
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Fri Aug 16 21:10:18 2019 +0800
[FLINK-13752] Only references necessary variables when bookkeeping result partitions on TM
---
.../flink/runtime/taskexecutor/TaskExecutor.java | 38 ++++++++++++++--------
1 file changed, 24 insertions(+), 14 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 621ef68..c8bcaf9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -618,7 +618,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
task.startTaskThread();
taskCompletionTracker.trackTaskCompletion(task);
- setupResultPartitionBookkeeping(tdd, task.getTerminationFuture());
+ setupResultPartitionBookkeeping(
+ tdd.getJobId(),
+ tdd.getProducedPartitions(),
+ task.getTerminationFuture());
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager already contains a task for id " +
@@ -632,31 +635,27 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
}
- private void setupResultPartitionBookkeeping(TaskDeploymentDescriptor tdd, CompletableFuture<ExecutionState> terminationFuture) {
- final List<ResultPartitionID> partitionsRequiringRelease = tdd.getProducedPartitions().stream()
- // only blocking partitions require explicit release call
- .filter(d -> d.getPartitionType().isBlocking())
- .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
- // partitions without local resources don't store anything on the TaskExecutor
- .filter(d -> d.storesLocalResourcesOn().isPresent())
- .map(ShuffleDescriptor::getResultPartitionID)
- .collect(Collectors.toList());
+ private void setupResultPartitionBookkeeping(
+ JobID jobId,
+ Collection<ResultPartitionDeploymentDescriptor> producedResultPartitions,
+ CompletableFuture<ExecutionState> terminationFuture) {
+ final List<ResultPartitionID> partitionsRequiringRelease = filterPartitionsRequiringRelease(producedResultPartitions);
- partitionTable.startTrackingPartitions(tdd.getJobId(), partitionsRequiringRelease);
+ partitionTable.startTrackingPartitions(jobId, partitionsRequiringRelease);
final CompletableFuture<ExecutionState> taskTerminationWithResourceCleanupFuture =
terminationFuture.thenApplyAsync(
executionState -> {
if (executionState != ExecutionState.FINISHED) {
- partitionTable.stopTrackingPartitions(tdd.getJobId(), partitionsRequiringRelease);
+ partitionTable.stopTrackingPartitions(jobId, partitionsRequiringRelease);
}
return executionState;
},
getMainThreadExecutor());
taskResultPartitionCleanupFuturesPerJob.compute(
- tdd.getJobId(),
- (jobID, completableFutures) -> {
+ jobId,
+ (ignored, completableFutures) -> {
if (completableFutures == null) {
completableFutures = new ArrayList<>(4);
}
@@ -666,6 +665,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
});
}
+ private List<ResultPartitionID> filterPartitionsRequiringRelease(Collection<ResultPartitionDeploymentDescriptor> producedResultPartitions) {
+ return producedResultPartitions.stream()
+ // only blocking partitions require explicit release call
+ .filter(d -> d.getPartitionType().isBlocking())
+ .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+ // partitions without local resources don't store anything on the TaskExecutor
+ .filter(d -> d.storesLocalResourcesOn().isPresent())
+ .map(ShuffleDescriptor::getResultPartitionID)
+ .collect(Collectors.toList());
+ }
+
@Override
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
final Task task = taskSlotTable.getTask(executionAttemptID);