You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/08/19 22:57:02 UTC

[flink] 03/04: [FLINK-13752] Only references necessary variables when bookkeeping result partitions on TM

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to annotated tag release-1.9.0-rc3
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04e95278777611519f5d14813dec4cbc533e2934
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Fri Aug 16 21:10:18 2019 +0800

    [FLINK-13752] Only references necessary variables when bookkeeping result partitions on TM
---
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 38 ++++++++++++++--------
 1 file changed, 24 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9b295dd..5b05be3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -597,7 +597,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			if (taskAdded) {
 				task.startTaskThread();
 
-				setupResultPartitionBookkeeping(tdd, task.getTerminationFuture());
+				setupResultPartitionBookkeeping(
+					tdd.getJobId(),
+					tdd.getProducedPartitions(),
+					task.getTerminationFuture());
 				return CompletableFuture.completedFuture(Acknowledge.get());
 			} else {
 				final String message = "TaskManager already contains a task for id " +
@@ -611,31 +614,27 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		}
 	}
 
-	private void setupResultPartitionBookkeeping(TaskDeploymentDescriptor tdd, CompletableFuture<ExecutionState> terminationFuture) {
-		final List<ResultPartitionID> partitionsRequiringRelease = tdd.getProducedPartitions().stream()
-			// only blocking partitions require explicit release call
-			.filter(d -> d.getPartitionType().isBlocking())
-			.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
-			// partitions without local resources don't store anything on the TaskExecutor
-			.filter(d -> d.storesLocalResourcesOn().isPresent())
-			.map(ShuffleDescriptor::getResultPartitionID)
-			.collect(Collectors.toList());
+	private void setupResultPartitionBookkeeping(
+			JobID jobId,
+			Collection<ResultPartitionDeploymentDescriptor> producedResultPartitions,
+			CompletableFuture<ExecutionState> terminationFuture) {
+		final List<ResultPartitionID> partitionsRequiringRelease = filterPartitionsRequiringRelease(producedResultPartitions);
 
-		partitionTable.startTrackingPartitions(tdd.getJobId(), partitionsRequiringRelease);
+		partitionTable.startTrackingPartitions(jobId, partitionsRequiringRelease);
 
 		final CompletableFuture<ExecutionState> taskTerminationWithResourceCleanupFuture =
 			terminationFuture.thenApplyAsync(
 				executionState -> {
 					if (executionState != ExecutionState.FINISHED) {
-						partitionTable.stopTrackingPartitions(tdd.getJobId(), partitionsRequiringRelease);
+						partitionTable.stopTrackingPartitions(jobId, partitionsRequiringRelease);
 					}
 					return executionState;
 				},
 				getMainThreadExecutor());
 
 		taskResultPartitionCleanupFuturesPerJob.compute(
-			tdd.getJobId(),
-			(jobID, completableFutures) -> {
+			jobId,
+			(ignored, completableFutures) -> {
 				if (completableFutures == null) {
 					completableFutures = new ArrayList<>(4);
 				}
@@ -645,6 +644,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			});
 	}
 
+	private List<ResultPartitionID> filterPartitionsRequiringRelease(Collection<ResultPartitionDeploymentDescriptor> producedResultPartitions) {
+		return producedResultPartitions.stream()
+			// only blocking partitions require explicit release call
+			.filter(d -> d.getPartitionType().isBlocking())
+			.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+			// partitions without local resources don't store anything on the TaskExecutor
+			.filter(d -> d.storesLocalResourcesOn().isPresent())
+			.map(ShuffleDescriptor::getResultPartitionID)
+			.collect(Collectors.toList());
+	}
+
 	@Override
 	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
 		final Task task = taskSlotTable.getTask(executionAttemptID);