You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/08/19 22:57:02 UTC
[flink] 03/04: [FLINK-13752] Only references necessary variables
when bookkeeping result partitions on TM
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to annotated tag release-1.9.0-rc3
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 04e95278777611519f5d14813dec4cbc533e2934
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Fri Aug 16 21:10:18 2019 +0800
[FLINK-13752] Only references necessary variables when bookkeeping result partitions on TM
---
.../flink/runtime/taskexecutor/TaskExecutor.java | 38 ++++++++++++++--------
1 file changed, 24 insertions(+), 14 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9b295dd..5b05be3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -597,7 +597,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
if (taskAdded) {
task.startTaskThread();
- setupResultPartitionBookkeeping(tdd, task.getTerminationFuture());
+ setupResultPartitionBookkeeping(
+ tdd.getJobId(),
+ tdd.getProducedPartitions(),
+ task.getTerminationFuture());
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager already contains a task for id " +
@@ -611,31 +614,27 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
}
- private void setupResultPartitionBookkeeping(TaskDeploymentDescriptor tdd, CompletableFuture<ExecutionState> terminationFuture) {
- final List<ResultPartitionID> partitionsRequiringRelease = tdd.getProducedPartitions().stream()
- // only blocking partitions require explicit release call
- .filter(d -> d.getPartitionType().isBlocking())
- .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
- // partitions without local resources don't store anything on the TaskExecutor
- .filter(d -> d.storesLocalResourcesOn().isPresent())
- .map(ShuffleDescriptor::getResultPartitionID)
- .collect(Collectors.toList());
+ private void setupResultPartitionBookkeeping(
+ JobID jobId,
+ Collection<ResultPartitionDeploymentDescriptor> producedResultPartitions,
+ CompletableFuture<ExecutionState> terminationFuture) {
+ final List<ResultPartitionID> partitionsRequiringRelease = filterPartitionsRequiringRelease(producedResultPartitions);
- partitionTable.startTrackingPartitions(tdd.getJobId(), partitionsRequiringRelease);
+ partitionTable.startTrackingPartitions(jobId, partitionsRequiringRelease);
final CompletableFuture<ExecutionState> taskTerminationWithResourceCleanupFuture =
terminationFuture.thenApplyAsync(
executionState -> {
if (executionState != ExecutionState.FINISHED) {
- partitionTable.stopTrackingPartitions(tdd.getJobId(), partitionsRequiringRelease);
+ partitionTable.stopTrackingPartitions(jobId, partitionsRequiringRelease);
}
return executionState;
},
getMainThreadExecutor());
taskResultPartitionCleanupFuturesPerJob.compute(
- tdd.getJobId(),
- (jobID, completableFutures) -> {
+ jobId,
+ (ignored, completableFutures) -> {
if (completableFutures == null) {
completableFutures = new ArrayList<>(4);
}
@@ -645,6 +644,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
});
}
+ private List<ResultPartitionID> filterPartitionsRequiringRelease(Collection<ResultPartitionDeploymentDescriptor> producedResultPartitions) {
+ return producedResultPartitions.stream()
+ // only blocking partitions require explicit release call
+ .filter(d -> d.getPartitionType().isBlocking())
+ .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+ // partitions without local resources don't store anything on the TaskExecutor
+ .filter(d -> d.storesLocalResourcesOn().isPresent())
+ .map(ShuffleDescriptor::getResultPartitionID)
+ .collect(Collectors.toList());
+ }
+
@Override
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
final Task task = taskSlotTable.getTask(executionAttemptID);