You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/02 07:39:19 UTC

[flink] branch master updated (36fdbef -> 7b95f32)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 36fdbef  [FLINK-13427][hive] HiveCatalog's createFunction fails when function name has upper-case characters
     new fc72a7c  [hotfix][coordination] Check whether partition set to track is empty
     new 7b95f32  [FLINK-13371][coordination] Prevent leaks of blocking partitions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/executiongraph/Execution.java    |  71 +++--
 .../runtime/executiongraph/ExecutionGraph.java     |   2 +-
 .../runtime/executiongraph/ExecutionVertex.java    |   4 +-
 .../io/network/partition/PartitionTracker.java     |   5 +
 .../io/network/partition/PartitionTrackerImpl.java |   7 +
 .../flink/runtime/shuffle/ProducerDescriptor.java  |   4 +-
 .../taskexecutor/partition/PartitionTable.java     |   4 +
 .../ExecutionGraphDeploymentTest.java              |   2 +-
 .../ExecutionPartitionLifecycleTest.java           | 324 +++++++++++++++++++++
 .../runtime/executiongraph/ExecutionTest.java      | 127 --------
 .../io/network/partition/NoOpPartitionTracker.java |   4 +
 .../io/network/partition/PartitionTestUtils.java   |   2 +-
 .../partition/PartitionTrackerImplTest.java        |   7 +-
 .../network/partition/TestingPartitionTracker.java |  10 +
 .../TaskExecutorPartitionLifecycleTest.java        |  51 +++-
 .../taskexecutor/partition/PartitionTableTest.java |   9 +
 16 files changed, 469 insertions(+), 164 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java


[flink] 02/02: [FLINK-13371][coordination] Prevent leaks of blocking partitions

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7b95f32d01730bcc75ded42e41d3668a1802a69b
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Aug 1 14:27:28 2019 +0200

    [FLINK-13371][coordination] Prevent leaks of blocking partitions
---
 .../flink/runtime/executiongraph/Execution.java    |  71 +++--
 .../runtime/executiongraph/ExecutionGraph.java     |   2 +-
 .../runtime/executiongraph/ExecutionVertex.java    |   4 +-
 .../io/network/partition/PartitionTracker.java     |   5 +
 .../io/network/partition/PartitionTrackerImpl.java |   7 +
 .../flink/runtime/shuffle/ProducerDescriptor.java  |   4 +-
 .../ExecutionGraphDeploymentTest.java              |   2 +-
 .../ExecutionPartitionLifecycleTest.java           | 324 +++++++++++++++++++++
 .../runtime/executiongraph/ExecutionTest.java      | 127 --------
 .../io/network/partition/NoOpPartitionTracker.java |   4 +
 .../io/network/partition/PartitionTestUtils.java   |   2 +-
 .../partition/PartitionTrackerImplTest.java        |   7 +-
 .../network/partition/TestingPartitionTracker.java |  10 +
 .../TaskExecutorPartitionLifecycleTest.java        |  51 +++-
 14 files changed, 456 insertions(+), 164 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index af94188..d8a1b6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -56,6 +56,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ProducerDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
@@ -1013,7 +1014,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	void markFailed(Throwable t, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
-		processFail(t, true, userAccumulators, metrics);
+		// skip release of partitions since this is only called if the TM actually sent the FAILED state update
+		// in this case all partitions have already been cleaned up
+		processFail(t, true, userAccumulators, metrics, false);
 	}
 
 	@VisibleForTesting
@@ -1059,7 +1062,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			else if (current == CANCELING) {
 				// we sent a cancel call, and the task manager finished before it arrived. We
 				// will never get a CANCELED call back from the job manager
-				completeCancelling(userAccumulators, metrics);
+				completeCancelling(userAccumulators, metrics, true);
 				return;
 			}
 			else if (current == CANCELED || current == FAILED) {
@@ -1096,10 +1099,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	void completeCancelling() {
-		completeCancelling(null, null);
+		completeCancelling(null, null, true);
 	}
 
-	void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+	void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) {
 
 		// the taskmanagers can themselves cancel tasks without an external trigger, if they find that the
 		// network stack is canceled (for example by a failing / canceling receiver or sender
@@ -1117,7 +1120,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
 				if (transitionState(current, CANCELED)) {
-					finishCancellation();
+					finishCancellation(releasePartitions);
 					return;
 				}
 
@@ -1136,11 +1139,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	private void finishCancellation() {
+	private void finishCancellation(boolean releasePartitions) {
 		releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
 		vertex.getExecutionGraph().deregisterExecution(this);
-		// release partitions on TM in case the Task finished while we where already CANCELING
-		stopTrackingAndReleasePartitions();
+		handlePartitionCleanup(releasePartitions, releasePartitions);
 	}
 
 	void cachePartitionInfo(PartitionInfo partitionInfo) {
@@ -1160,10 +1162,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	// --------------------------------------------------------------------------------------------
 
 	private boolean processFail(Throwable t, boolean isCallback) {
-		return processFail(t, isCallback, null, null);
+		return processFail(t, isCallback, null, null, true);
 	}
 
-	private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+	private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) {
 		// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
 		// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
 
@@ -1187,7 +1189,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			}
 
 			if (current == CANCELING) {
-				completeCancelling(userAccumulators, metrics);
+				completeCancelling(userAccumulators, metrics, true);
 				return false;
 			}
 
@@ -1199,7 +1201,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 				releaseAssignedResource(t);
 				vertex.getExecutionGraph().deregisterExecution(this);
-				stopTrackingAndReleasePartitions();
+				handlePartitionCleanup(releasePartitions, releasePartitions);
 
 				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
 					if (LOG.isDebugEnabled()) {
@@ -1304,16 +1306,51 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	void stopTrackingAndReleasePartitions() {
+	void handlePartitionCleanup(boolean releasePipelinedPartitions, boolean releaseBlockingPartitions) {
+		if (releasePipelinedPartitions) {
+			sendReleaseIntermediateResultPartitionsRpcCall();
+		}
+
+		final Collection<ResultPartitionID> partitionIds = getPartitionIds();
+		final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker();
+
+		if (!partitionIds.isEmpty()) {
+			if (releaseBlockingPartitions) {
+				LOG.info("Discarding the results produced by task execution {}.", attemptId);
+				partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
+			} else {
+				partitionTracker.stopTrackingPartitions(partitionIds);
+			}
+		}
+	}
+
+	private Collection<ResultPartitionID> getPartitionIds() {
+		return producedPartitions.values().stream()
+			.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+			.map(ShuffleDescriptor::getResultPartitionID)
+			.collect(Collectors.toList());
+	}
+
+	private void sendReleaseIntermediateResultPartitionsRpcCall() {
 		LOG.info("Discarding the results produced by task execution {}.", attemptId);
-		if (producedPartitions != null && producedPartitions.size() > 0) {
-			final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker();
-			final List<ResultPartitionID> producedPartitionIds = producedPartitions.values().stream()
+		final LogicalSlot slot = assignedResource;
+
+		if (slot != null) {
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+			final ShuffleMaster<?> shuffleMaster = getVertex().getExecutionGraph().getShuffleMaster();
+
+			Collection<ResultPartitionID> partitionIds = producedPartitions.values().stream()
+				.filter(resultPartitionDeploymentDescriptor -> resultPartitionDeploymentDescriptor.getPartitionType().isPipelined())
 				.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+				.peek(shuffleMaster::releasePartitionExternally)
 				.map(ShuffleDescriptor::getResultPartitionID)
 				.collect(Collectors.toList());
 
-			partitionTracker.stopTrackingAndReleasePartitions(producedPartitionIds);
+			if (!partitionIds.isEmpty()) {
+				// TODO For some tests this could be a problem when querying too early if all resources were released
+				taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds);
+			}
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f779fd2..0984274 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1525,7 +1525,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			case CANCELED:
 				// this deserialization is exception-free
 				accumulators = deserializeAccumulators(state);
-				attempt.completeCancelling(accumulators, state.getIOMetrics());
+				attempt.completeCancelling(accumulators, state.getIOMetrics(), false);
 				return true;
 
 			case FAILED:
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 03c68f8..6d262ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -604,7 +604,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 			if (oldState.isTerminal()) {
 				if (oldState == FINISHED) {
-					oldExecution.stopTrackingAndReleasePartitions();
+					// pipelined partitions are released in Execution#cancel(), covering both job failures and vertex resets
+					// do not release pipelined partitions here to save RPC calls
+					oldExecution.handlePartitionCleanup(false, true);
 					getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
 				}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
index 3697fb8..268f4f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
@@ -46,6 +46,11 @@ public interface PartitionTracker {
 	void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds);
 
 	/**
+	 * Stops the tracking of the given partitions.
+	 */
+	void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds);
+
+	/**
 	 * Releases all partitions for the given task executor ID, and stop the tracking of partitions that were released.
 	 */
 	void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index f772b37..2e7f421 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -106,6 +106,13 @@ public class PartitionTrackerImpl implements PartitionTracker {
 	}
 
 	@Override
+	public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+		Preconditions.checkNotNull(resultPartitionIds);
+
+		resultPartitionIds.forEach(this::internalStopTrackingPartition);
+	}
+
+	@Override
 	public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
 		Preconditions.checkNotNull(producingTaskExecutorId);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
index ad3fc48..ec32178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
@@ -63,11 +63,11 @@ public class ProducerDescriptor {
 		this.dataPort = dataPort;
 	}
 
-	ResourceID getProducerLocation() {
+	public ResourceID getProducerLocation() {
 		return producerLocation;
 	}
 
-	ExecutionAttemptID getProducerExecutionId() {
+	public ExecutionAttemptID getProducerExecutionId() {
 		return producerExecutionId;
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d0cfa00..d10fcbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -382,7 +382,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		Execution execution1 = executions.values().iterator().next();
 		execution1.cancel();
-		execution1.completeCancelling(accumulators, ioMetrics);
+		execution1.completeCancelling(accumulators, ioMetrics, false);
 
 		assertEquals(ioMetrics, execution1.getIOMetrics());
 		assertEquals(accumulators, execution1.getUserAccumulators());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
new file mode 100644
index 0000000..ab4048d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link Execution}.
+ */
+public class ExecutionPartitionLifecycleTest extends TestLogger {
+
+	@ClassRule
+	public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE =
+		new TestingComponentMainThreadExecutor.Resource();
+
+	private Execution execution;
+	private ResultPartitionDeploymentDescriptor descriptor;
+	private ResourceID taskExecutorResourceId;
+	private JobID jobId;
+
+	@Test
+	public void testPartitionReleaseOnFinishWhileCanceling() throws Exception {
+		final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+		final CompletableFuture<Tuple2<JobID, Collection<ResultPartitionID>>> releasePartitionsCallFuture = new CompletableFuture<>();
+		taskManagerGateway.setReleasePartitionsConsumer(((jobID, partitionIds) -> releasePartitionsCallFuture.complete(Tuple2.of(jobID, partitionIds))));
+
+		final TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
+
+		setupExecutionGraphAndStartRunningJob(ResultPartitionType.PIPELINED, NoOpPartitionTracker.INSTANCE, taskManagerGateway, testingShuffleMaster);
+
+		execution.cancel();
+		assertFalse(releasePartitionsCallFuture.isDone());
+
+		execution.markFinished();
+		assertTrue(releasePartitionsCallFuture.isDone());
+
+		final Tuple2<JobID, Collection<ResultPartitionID>> releasePartitionsCall = releasePartitionsCallFuture.get();
+		assertEquals(jobId, releasePartitionsCall.f0);
+		assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), releasePartitionsCall.f1);
+
+		assertEquals(1, testingShuffleMaster.externallyReleasedPartitions.size());
+		assertEquals(descriptor.getShuffleDescriptor(), testingShuffleMaster.externallyReleasedPartitions.poll());
+	}
+
+	private enum PartitionReleaseResult {
+		NONE,
+		STOP_TRACKING,
+		STOP_TRACKING_AND_RELEASE
+	}
+
+	@Test
+	public void testPartitionTrackedAndNotReleasedWhenFinished() throws Exception {
+		testPartitionTrackingForStateTransition(Execution::markFinished, PartitionReleaseResult.NONE);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndNotReleasedWhenCanceledByTM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> {
+				execution.cancel();
+				execution.completeCancelling(Collections.emptyMap(), new IOMetrics(0, 0, 0, 0), false);
+			},
+			PartitionReleaseResult.STOP_TRACKING);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndReleasedWhenCanceledByJM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> {
+				execution.cancel();
+				execution.completeCancelling();
+			},
+			PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndNotReleasedWhenFailedByTM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> execution.markFailed(
+				new Exception("Test exception"),
+				Collections.emptyMap(),
+				new IOMetrics(0, 0, 0, 0)),
+			PartitionReleaseResult.STOP_TRACKING);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndReleasedWhenFailedByJM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> execution.markFailed(new Exception("Test exception")),
+			PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
+	}
+
+	private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final PartitionReleaseResult partitionReleaseResult) throws Exception {
+		CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>();
+		CompletableFuture<Collection<ResultPartitionID>> partitionStopTrackingFuture = new CompletableFuture<>();
+		CompletableFuture<Collection<ResultPartitionID>> partitionStopTrackingAndReleaseFuture = new CompletableFuture<>();
+		final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+		partitionTracker.setStartTrackingPartitionsConsumer(
+			(resourceID, resultPartitionDeploymentDescriptor) ->
+				partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor))
+		);
+		partitionTracker.setStopTrackingPartitionsConsumer(partitionStopTrackingFuture::complete);
+		partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionStopTrackingAndReleaseFuture::complete);
+
+		setupExecutionGraphAndStartRunningJob(ResultPartitionType.BLOCKING, partitionTracker, new SimpleAckingTaskManagerGateway(), NettyShuffleMaster.INSTANCE);
+
+		Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get();
+		assertThat(startTrackingCall.f0, equalTo(taskExecutorResourceId));
+		assertThat(startTrackingCall.f1, equalTo(descriptor));
+
+		stateTransition.accept(execution);
+
+		switch (partitionReleaseResult) {
+			case NONE:
+				assertFalse(partitionStopTrackingFuture.isDone());
+				assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+				break;
+			case STOP_TRACKING:
+				assertTrue(partitionStopTrackingFuture.isDone());
+				assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+				final Collection<ResultPartitionID> stopTrackingCall = partitionStopTrackingFuture.get();
+				assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), stopTrackingCall);
+				break;
+			case STOP_TRACKING_AND_RELEASE:
+				assertFalse(partitionStopTrackingFuture.isDone());
+				assertTrue(partitionStopTrackingAndReleaseFuture.isDone());
+				final Collection<ResultPartitionID> stopTrackingAndReleaseCall = partitionStopTrackingAndReleaseFuture.get();
+				assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), stopTrackingAndReleaseCall);
+				break;
+		}
+	}
+
+	private void setupExecutionGraphAndStartRunningJob(ResultPartitionType resultPartitionType, PartitionTracker partitionTracker, TaskManagerGateway taskManagerGateway, ShuffleMaster<?> shuffleMaster) throws JobException, JobExecutionException {
+		final JobVertex producerVertex = createNoOpJobVertex();
+		final JobVertex consumerVertex = createNoOpJobVertex();
+		consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final SlotProvider slotProvider = new SlotProvider() {
+			@Override
+			public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
+				return CompletableFuture.completedFuture(new SimpleSlot(
+					new SingleSlotTestingSlotOwner(),
+					taskManagerLocation,
+					0,
+					taskManagerGateway));
+			}
+
+			@Override
+			public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
+			}
+		};
+
+		final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(
+			null,
+			new JobGraph(new JobID(), "test job", producerVertex, consumerVertex),
+			new Configuration(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			slotProvider,
+			ExecutionPartitionLifecycleTest.class.getClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(10),
+			new NoRestartStrategy(),
+			new UnregisteredMetricsGroup(),
+			VoidBlobWriter.getInstance(),
+			Time.seconds(10),
+			log,
+			shuffleMaster,
+			partitionTracker);
+
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+		final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
+		final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+		execution = executionVertex.getCurrentExecutionAttempt();
+
+		execution.allocateResourcesForExecution(
+			executionGraph.getSlotProviderStrategy(),
+			LocationPreferenceConstraint.ALL,
+			Collections.emptySet());
+
+		execution.deploy();
+		execution.switchToRunning();
+
+		final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex
+			.getProducedDataSets()[0]
+			.getPartitions()[0]
+			.getPartitionId();
+
+		descriptor = execution
+			.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
+		taskExecutorResourceId = taskManagerLocation.getResourceID();
+		jobId = executionGraph.getJobID();
+	}
+
+	@Nonnull
+	private JobVertex createNoOpJobVertex() {
+		final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		return jobVertex;
+	}
+
+	/**
+	 * Slot owner which records the first returned slot.
+	 */
+	private static final class SingleSlotTestingSlotOwner implements SlotOwner {
+
+		final CompletableFuture<LogicalSlot> returnedSlot = new CompletableFuture<>();
+
+		@Override
+		public void returnLogicalSlot(LogicalSlot logicalSlot) {
+			returnedSlot.complete(logicalSlot);
+		}
+	}
+
+	private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
+
+		final Queue<ShuffleDescriptor> externallyReleasedPartitions = new ArrayBlockingQueue<>(4);
+
+		@Override
+		public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
+			return CompletableFuture.completedFuture(new ShuffleDescriptor() {
+				@Override
+				public ResultPartitionID getResultPartitionID() {
+					return new ResultPartitionID(
+						partitionDescriptor.getPartitionId(),
+						producerDescriptor.getProducerExecutionId());
+				}
+
+				@Override
+				public Optional<ResourceID> storesLocalResourcesOn() {
+					return Optional.of(producerDescriptor.getProducerLocation());
+				}
+			});
+		}
+
+		@Override
+		public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
+			externallyReleasedPartitions.add(shuffleDescriptor);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index b8c83fe..433e3f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,33 +19,16 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
@@ -53,10 +36,8 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -65,7 +46,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -74,9 +54,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
 
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -526,111 +504,6 @@ public class ExecutionTest extends TestLogger {
 		assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get())));
 	}
 
-	@Test
-	public void testPartitionRetainedWhenFinished() throws Exception {
-		testPartitionTrackingForStateTransition(Execution::markFinished, false);
-	}
-
-	@Test
-	public void testPartitionReleasedWhenCanceled() throws Exception {
-		testPartitionTrackingForStateTransition(
-			execution -> {
-				execution.cancel();
-				execution.completeCancelling();
-			},
-			true);
-	}
-
-	@Test
-	public void testPartitionReleasedWhenFailed() throws Exception {
-		testPartitionTrackingForStateTransition(execution -> execution.fail(new Exception("Test exception")), true);
-	}
-
-	private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final boolean shouldPartitionBeReleased) throws Exception {
-		final JobVertex producerVertex = createNoOpJobVertex();
-		final JobVertex consumerVertex = createNoOpJobVertex();
-		consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
-
-		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
-
-		final SlotProvider slotProvider = new SlotProvider() {
-			@Override
-			public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
-				return CompletableFuture.completedFuture(new SimpleSlot(
-					new SingleSlotTestingSlotOwner(),
-					taskManagerLocation,
-					0,
-					new SimpleAckingTaskManagerGateway()));
-			}
-
-			@Override
-			public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
-			}
-		};
-
-		CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>();
-		CompletableFuture<Collection<ResultPartitionID>> partitionReleaseFuture = new CompletableFuture<>();
-		final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
-		partitionTracker.setStartTrackingPartitionsConsumer(
-			(resourceID, resultPartitionDeploymentDescriptor) ->
-				partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor))
-		);
-		partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionReleaseFuture::complete);
-
-		final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(
-			null,
-			new JobGraph(new JobID(), "test job", producerVertex, consumerVertex),
-			new Configuration(),
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			slotProvider,
-			ExecutionTest.class.getClassLoader(),
-			new StandaloneCheckpointRecoveryFactory(),
-			Time.seconds(10),
-			new NoRestartStrategy(),
-			new UnregisteredMetricsGroup(),
-			VoidBlobWriter.getInstance(),
-			Time.seconds(10),
-			log,
-			NettyShuffleMaster.INSTANCE,
-			partitionTracker);
-
-		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
-		final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
-		final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
-
-		final Execution execution = executionVertex.getCurrentExecutionAttempt();
-
-		execution.allocateResourcesForExecution(
-			executionGraph.getSlotProviderStrategy(),
-			LocationPreferenceConstraint.ALL,
-			Collections.emptySet());
-
-		assertThat(partitionStartTrackingFuture.isDone(), is(true));
-		final Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get();
-
-		final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex
-			.getProducedDataSets()[0]
-			.getPartitions()[0]
-			.getPartitionId();
-		final ResultPartitionDeploymentDescriptor descriptor = execution
-			.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
-		assertThat(startTrackingCall.f0, equalTo(taskManagerLocation.getResourceID()));
-		assertThat(startTrackingCall.f1, equalTo(descriptor));
-
-		execution.deploy();
-		execution.switchToRunning();
-
-		stateTransition.accept(execution);
-
-		assertThat(partitionReleaseFuture.isDone(), is(shouldPartitionBeReleased));
-		if (shouldPartitionBeReleased) {
-			final Collection<ResultPartitionID> partitionReleaseCall = partitionReleaseFuture.get();
-			assertThat(partitionReleaseCall, contains(descriptor.getShuffleDescriptor().getResultPartitionID()));
-		}
-	}
-
 	/**
 	 * Tests that a slot release will atomically release the assigned {@link Execution}.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
index 0d1a160..2888b31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
@@ -43,6 +43,10 @@ public enum NoOpPartitionTracker implements PartitionTracker {
 	}
 
 	@Override
+	public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+	}
+
+	@Override
 	public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 72892d6..b1a58a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -96,7 +96,7 @@ public enum PartitionTestUtils {
 		}
 	}
 
-	static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(
+	public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(
 		ResultPartitionType partitionType) {
 		ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
 		PartitionDescriptor partitionDescriptor = new PartitionDescriptor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
index 5ca7156..63e4f44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
@@ -50,12 +50,12 @@ import static org.junit.Assert.assertEquals;
 public class PartitionTrackerImplTest extends TestLogger {
 
 	@Test
-	public void testReleasedOnConsumptionPartitionIsNotTracked() {
+	public void testPipelinedPartitionIsNotTracked() {
 		testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
 	}
 
 	@Test
-	public void testRetainedOnConsumptionPartitionIsTracked() {
+	public void testBlockingPartitionIsTracked() {
 		testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
 	}
 
@@ -75,8 +75,7 @@ public class PartitionTrackerImplTest extends TestLogger {
 				resultPartitionType,
 				false));
 
-		final boolean isTrackingExpected = resultPartitionType == ResultPartitionType.BLOCKING;
-		assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(isTrackingExpected));
+		assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(resultPartitionType.isBlocking()));
 	}
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
index 2d85ff6..6ba333d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
@@ -36,6 +36,7 @@ public class TestingPartitionTracker implements PartitionTracker {
 	private Consumer<ResourceID> stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {};
 	private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {};
 	private Consumer<Collection<ResultPartitionID>> stopTrackingAndReleasePartitionsConsumer = ignored -> {};
+	private Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer = ignored -> {};
 
 	public void setStartTrackingPartitionsConsumer(BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer) {
 		this.startTrackingPartitionsConsumer = startTrackingPartitionsConsumer;
@@ -61,6 +62,10 @@ public class TestingPartitionTracker implements PartitionTracker {
 		this.stopTrackingAndReleasePartitionsConsumer = stopTrackingAndReleasePartitionsConsumer;
 	}
 
+	public void setStopTrackingPartitionsConsumer(Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer) {
+		this.stopTrackingPartitionsConsumer = stopTrackingPartitionsConsumer;
+	}
+
 	@Override
 	public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
 		this.startTrackingPartitionsConsumer.accept(producingTaskExecutorId, resultPartitionDeploymentDescriptor);
@@ -77,6 +82,11 @@ public class TestingPartitionTracker implements PartitionTracker {
 	}
 
 	@Override
+	public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+		stopTrackingPartitionsConsumer.accept(resultPartitionIds);
+	}
+
+	@Override
 	public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
 		stopTrackingAndReleaseAllPartitionsConsumer.accept(producingTaskExecutorId);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index d87cf84..6b6b9e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvi
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -90,6 +91,7 @@ import java.util.stream.StreamSupport;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -190,33 +192,62 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	}
 
 	@Test
-	public void testPartitionReleaseAfterDisconnect() throws Exception {
+	public void testBlockingPartitionReleaseAfterDisconnect() throws Exception {
 		testPartitionRelease(
 			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")),
-			true);
+			true,
+			ResultPartitionType.BLOCKING);
 	}
 
 	@Test
-	public void testPartitionReleaseAfterReleaseCall() throws Exception {
+	public void testPipelinedPartitionNotReleasedAfterDisconnect() throws Exception {
+		testPartitionRelease(
+			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")),
+			false,
+			ResultPartitionType.PIPELINED);
+	}
+
+	@Test
+	public void testBlockingPartitionReleaseAfterReleaseCall() throws Exception {
 		testPartitionRelease(
 			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)),
-			true);
+			true,
+			ResultPartitionType.BLOCKING);
 	}
 
 	@Test
-	public void testPartitionReleaseAfterShutdown() throws Exception {
+	public void testPipelinedPartitionReleaseAfterReleaseCall() throws Exception {
+		testPartitionRelease(
+			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)),
+			true,
+			ResultPartitionType.PIPELINED);
+	}
+
+	@Test
+	public void testBlockingPartitionReleaseAfterShutdown() throws Exception {
 		// don't do any explicit release action, so that the partition must be cleaned up on shutdown
 		testPartitionRelease(
 			(jobId, partitionId, taskExecutorGateway) -> { },
-			false);
+			false,
+			ResultPartitionType.BLOCKING);
+	}
+
+	@Test
+	public void testPipelinedPartitionReleaseAfterShutdown() throws Exception {
+		// don't do any explicit release action, so that the partition must be cleaned up on shutdown
+		testPartitionRelease(
+			(jobId, partitionId, taskExecutorGateway) -> { },
+			false,
+			ResultPartitionType.PIPELINED);
 	}
 
 	private void testPartitionRelease(
 		TriConsumer<JobID, ResultPartitionID, TaskExecutorGateway> releaseAction,
-		boolean waitForRelease) throws Exception {
+		boolean waitForRelease,
+		ResultPartitionType resultPartitionType) throws Exception {
 
 		final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
-			PartitionTestUtils.createPartitionDeploymentDescriptor();
+			PartitionTestUtils.createPartitionDeploymentDescriptor(resultPartitionType);
 		final ExecutionAttemptID eid1 = taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
 
 		final TaskDeploymentDescriptor taskDeploymentDescriptor =
@@ -343,7 +374,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			// the task is still running => the partition is in in-progress
 			runInTaskExecutorThreadAndWait(
 				taskExecutor,
-				() -> assertTrue(partitionTable.hasTrackedPartitions(jobId)));
+				() -> assertThat(partitionTable.hasTrackedPartitions(jobId), is(resultPartitionType.isBlocking())));
 
 			TestingInvokable.sync.releaseBlocker();
 			taskFinishedFuture.get(timeout.getSize(), timeout.getUnit());
@@ -351,7 +382,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			// the task is finished => the partition should be finished now
 			runInTaskExecutorThreadAndWait(
 				taskExecutor,
-				() -> assertTrue(partitionTable.hasTrackedPartitions(jobId)));
+				() -> assertThat(partitionTable.hasTrackedPartitions(jobId), is(resultPartitionType.isBlocking())));
 
 			final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
 			runInTaskExecutorThreadAndWait(


[flink] 01/02: [hotfix][coordination] Check whether partition set to track is empty

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc72a7c95b4d745e2df64bd75857e77f4d5ca14a
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jul 26 12:49:45 2019 +0200

    [hotfix][coordination] Check whether partition set to track is empty
---
 .../flink/runtime/taskexecutor/partition/PartitionTable.java     | 4 ++++
 .../flink/runtime/taskexecutor/partition/PartitionTableTest.java | 9 +++++++++
 2 files changed, 13 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
index 02942cf..d214e09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
@@ -51,6 +51,10 @@ public class PartitionTable<K> {
 		Preconditions.checkNotNull(key);
 		Preconditions.checkNotNull(newPartitionIds);
 
+		if (newPartitionIds.isEmpty()) {
+			return;
+		}
+
 		trackedPartitionsPerJob.compute(key, (ignored, partitionIds) -> {
 			if (partitionIds == null) {
 				partitionIds = new HashSet<>(8);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
index 4e54af1..e2f63fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
@@ -63,6 +63,15 @@ public class PartitionTableTest extends TestLogger {
 	}
 
 	@Test
+	public void testStartTrackingZeroPartitionDoesNotMutateState() {
+		final PartitionTable<JobID> table = new PartitionTable<>();
+
+		table.startTrackingPartitions(JOB_ID, Collections.emptyList());
+
+		assertFalse(table.hasTrackedPartitions(JOB_ID));
+	}
+
+	@Test
 	public void testStopTrackingAllPartitions() {
 		final PartitionTable<JobID> table = new PartitionTable<>();