You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/02 07:44:12 UTC

[flink] 15/15: [FLINK-13371][coordination] Prevent leaks of blocking partitions

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5ab84c6238ff1f69f2151ed580410ae4c63acd7
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Aug 1 14:27:28 2019 +0200

    [FLINK-13371][coordination] Prevent leaks of blocking partitions
---
 .../flink/runtime/executiongraph/Execution.java    |  71 +++--
 .../runtime/executiongraph/ExecutionGraph.java     |   2 +-
 .../runtime/executiongraph/ExecutionVertex.java    |   4 +-
 .../io/network/partition/PartitionTracker.java     |   5 +
 .../io/network/partition/PartitionTrackerImpl.java |   7 +
 .../flink/runtime/shuffle/ProducerDescriptor.java  |   4 +-
 .../ExecutionGraphDeploymentTest.java              |   2 +-
 .../ExecutionPartitionLifecycleTest.java           | 324 +++++++++++++++++++++
 .../runtime/executiongraph/ExecutionTest.java      | 127 --------
 .../io/network/partition/NoOpPartitionTracker.java |   4 +
 .../io/network/partition/PartitionTestUtils.java   |   2 +-
 .../partition/PartitionTrackerImplTest.java        |   7 +-
 .../network/partition/TestingPartitionTracker.java |  10 +
 .../TaskExecutorPartitionLifecycleTest.java        |  51 +++-
 14 files changed, 456 insertions(+), 164 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index af94188..d8a1b6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -56,6 +56,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ProducerDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
@@ -1013,7 +1014,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	void markFailed(Throwable t, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
-		processFail(t, true, userAccumulators, metrics);
+		// skip release of partitions since this is only called if the TM actually sent the FAILED state update
+		// in this case all partitions have already been cleaned up
+		processFail(t, true, userAccumulators, metrics, false);
 	}
 
 	@VisibleForTesting
@@ -1059,7 +1062,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			else if (current == CANCELING) {
 				// we sent a cancel call, and the task manager finished before it arrived. We
 				// will never get a CANCELED call back from the job manager
-				completeCancelling(userAccumulators, metrics);
+				completeCancelling(userAccumulators, metrics, true);
 				return;
 			}
 			else if (current == CANCELED || current == FAILED) {
@@ -1096,10 +1099,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	void completeCancelling() {
-		completeCancelling(null, null);
+		completeCancelling(null, null, true);
 	}
 
-	void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+	void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) {
 
 		// the taskmanagers can themselves cancel tasks without an external trigger, if they find that the
 		// network stack is canceled (for example by a failing / canceling receiver or sender
@@ -1117,7 +1120,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
 				if (transitionState(current, CANCELED)) {
-					finishCancellation();
+					finishCancellation(releasePartitions);
 					return;
 				}
 
@@ -1136,11 +1139,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	private void finishCancellation() {
+	private void finishCancellation(boolean releasePartitions) {
 		releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
 		vertex.getExecutionGraph().deregisterExecution(this);
-		// release partitions on TM in case the Task finished while we where already CANCELING
-		stopTrackingAndReleasePartitions();
+		handlePartitionCleanup(releasePartitions, releasePartitions);
 	}
 
 	void cachePartitionInfo(PartitionInfo partitionInfo) {
@@ -1160,10 +1162,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	// --------------------------------------------------------------------------------------------
 
 	private boolean processFail(Throwable t, boolean isCallback) {
-		return processFail(t, isCallback, null, null);
+		return processFail(t, isCallback, null, null, true);
 	}
 
-	private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+	private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) {
 		// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
 		// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
 
@@ -1187,7 +1189,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			}
 
 			if (current == CANCELING) {
-				completeCancelling(userAccumulators, metrics);
+				completeCancelling(userAccumulators, metrics, true);
 				return false;
 			}
 
@@ -1199,7 +1201,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 				releaseAssignedResource(t);
 				vertex.getExecutionGraph().deregisterExecution(this);
-				stopTrackingAndReleasePartitions();
+				handlePartitionCleanup(releasePartitions, releasePartitions);
 
 				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
 					if (LOG.isDebugEnabled()) {
@@ -1304,16 +1306,51 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	void stopTrackingAndReleasePartitions() {
+	void handlePartitionCleanup(boolean releasePipelinedPartitions, boolean releaseBlockingPartitions) {
+		if (releasePipelinedPartitions) {
+			sendReleaseIntermediateResultPartitionsRpcCall();
+		}
+
+		final Collection<ResultPartitionID> partitionIds = getPartitionIds();
+		final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker();
+
+		if (!partitionIds.isEmpty()) {
+			if (releaseBlockingPartitions) {
+				LOG.info("Discarding the results produced by task execution {}.", attemptId);
+				partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
+			} else {
+				partitionTracker.stopTrackingPartitions(partitionIds);
+			}
+		}
+	}
+
+	private Collection<ResultPartitionID> getPartitionIds() {
+		return producedPartitions.values().stream()
+			.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+			.map(ShuffleDescriptor::getResultPartitionID)
+			.collect(Collectors.toList());
+	}
+
+	private void sendReleaseIntermediateResultPartitionsRpcCall() {
 		LOG.info("Discarding the results produced by task execution {}.", attemptId);
-		if (producedPartitions != null && producedPartitions.size() > 0) {
-			final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker();
-			final List<ResultPartitionID> producedPartitionIds = producedPartitions.values().stream()
+		final LogicalSlot slot = assignedResource;
+
+		if (slot != null) {
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+			final ShuffleMaster<?> shuffleMaster = getVertex().getExecutionGraph().getShuffleMaster();
+
+			Collection<ResultPartitionID> partitionIds = producedPartitions.values().stream()
+				.filter(resultPartitionDeploymentDescriptor -> resultPartitionDeploymentDescriptor.getPartitionType().isPipelined())
 				.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+				.peek(shuffleMaster::releasePartitionExternally)
 				.map(ShuffleDescriptor::getResultPartitionID)
 				.collect(Collectors.toList());
 
-			partitionTracker.stopTrackingAndReleasePartitions(producedPartitionIds);
+			if (!partitionIds.isEmpty()) {
+				// TODO For some tests this could be a problem when querying too early if all resources were released
+				taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds);
+			}
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f779fd2..0984274 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1525,7 +1525,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			case CANCELED:
 				// this deserialization is exception-free
 				accumulators = deserializeAccumulators(state);
-				attempt.completeCancelling(accumulators, state.getIOMetrics());
+				attempt.completeCancelling(accumulators, state.getIOMetrics(), false);
 				return true;
 
 			case FAILED:
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 03c68f8..6d262ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -604,7 +604,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 			if (oldState.isTerminal()) {
 				if (oldState == FINISHED) {
-					oldExecution.stopTrackingAndReleasePartitions();
+					// pipelined partitions are released in Execution#cancel(), covering both job failures and vertex resets
+					// do not release pipelined partitions here to save RPC calls
+					oldExecution.handlePartitionCleanup(false, true);
 					getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
 				}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
index 3697fb8..268f4f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
@@ -46,6 +46,11 @@ public interface PartitionTracker {
 	void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds);
 
 	/**
+	 * Stops the tracking of the given partitions.
+	 */
+	void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds);
+
+	/**
 	 * Releases all partitions for the given task executor ID, and stop the tracking of partitions that were released.
 	 */
 	void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index f772b37..2e7f421 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -106,6 +106,13 @@ public class PartitionTrackerImpl implements PartitionTracker {
 	}
 
 	@Override
+	public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+		Preconditions.checkNotNull(resultPartitionIds);
+
+		resultPartitionIds.forEach(this::internalStopTrackingPartition);
+	}
+
+	@Override
 	public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
 		Preconditions.checkNotNull(producingTaskExecutorId);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
index ad3fc48..ec32178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
@@ -63,11 +63,11 @@ public class ProducerDescriptor {
 		this.dataPort = dataPort;
 	}
 
-	ResourceID getProducerLocation() {
+	public ResourceID getProducerLocation() {
 		return producerLocation;
 	}
 
-	ExecutionAttemptID getProducerExecutionId() {
+	public ExecutionAttemptID getProducerExecutionId() {
 		return producerExecutionId;
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d0cfa00..d10fcbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -382,7 +382,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		Execution execution1 = executions.values().iterator().next();
 		execution1.cancel();
-		execution1.completeCancelling(accumulators, ioMetrics);
+		execution1.completeCancelling(accumulators, ioMetrics, false);
 
 		assertEquals(ioMetrics, execution1.getIOMetrics());
 		assertEquals(accumulators, execution1.getUserAccumulators());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
new file mode 100644
index 0000000..ab4048d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link Execution}.
+ */
+public class ExecutionPartitionLifecycleTest extends TestLogger {
+
+	@ClassRule
+	public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE =
+		new TestingComponentMainThreadExecutor.Resource();
+
+	private Execution execution;
+	private ResultPartitionDeploymentDescriptor descriptor;
+	private ResourceID taskExecutorResourceId;
+	private JobID jobId;
+
+	@Test
+	public void testPartitionReleaseOnFinishWhileCanceling() throws Exception {
+		final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+		final CompletableFuture<Tuple2<JobID, Collection<ResultPartitionID>>> releasePartitionsCallFuture = new CompletableFuture<>();
+		taskManagerGateway.setReleasePartitionsConsumer(((jobID, partitionIds) -> releasePartitionsCallFuture.complete(Tuple2.of(jobID, partitionIds))));
+
+		final TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
+
+		setupExecutionGraphAndStartRunningJob(ResultPartitionType.PIPELINED, NoOpPartitionTracker.INSTANCE, taskManagerGateway, testingShuffleMaster);
+
+		execution.cancel();
+		assertFalse(releasePartitionsCallFuture.isDone());
+
+		execution.markFinished();
+		assertTrue(releasePartitionsCallFuture.isDone());
+
+		final Tuple2<JobID, Collection<ResultPartitionID>> releasePartitionsCall = releasePartitionsCallFuture.get();
+		assertEquals(jobId, releasePartitionsCall.f0);
+		assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), releasePartitionsCall.f1);
+
+		assertEquals(1, testingShuffleMaster.externallyReleasedPartitions.size());
+		assertEquals(descriptor.getShuffleDescriptor(), testingShuffleMaster.externallyReleasedPartitions.poll());
+	}
+
+	private enum PartitionReleaseResult {
+		NONE,
+		STOP_TRACKING,
+		STOP_TRACKING_AND_RELEASE
+	}
+
+	@Test
+	public void testPartitionTrackedAndNotReleasedWhenFinished() throws Exception {
+		testPartitionTrackingForStateTransition(Execution::markFinished, PartitionReleaseResult.NONE);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndNotReleasedWhenCanceledByTM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> {
+				execution.cancel();
+				execution.completeCancelling(Collections.emptyMap(), new IOMetrics(0, 0, 0, 0), false);
+			},
+			PartitionReleaseResult.STOP_TRACKING);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndReleasedWhenCanceledByJM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> {
+				execution.cancel();
+				execution.completeCancelling();
+			},
+			PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndNotReleasedWhenFailedByTM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> execution.markFailed(
+				new Exception("Test exception"),
+				Collections.emptyMap(),
+				new IOMetrics(0, 0, 0, 0)),
+			PartitionReleaseResult.STOP_TRACKING);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndReleasedWhenFailedByJM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> execution.markFailed(new Exception("Test exception")),
+			PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
+	}
+
+	private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final PartitionReleaseResult partitionReleaseResult) throws Exception {
+		CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>();
+		CompletableFuture<Collection<ResultPartitionID>> partitionStopTrackingFuture = new CompletableFuture<>();
+		CompletableFuture<Collection<ResultPartitionID>> partitionStopTrackingAndReleaseFuture = new CompletableFuture<>();
+		final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+		partitionTracker.setStartTrackingPartitionsConsumer(
+			(resourceID, resultPartitionDeploymentDescriptor) ->
+				partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor))
+		);
+		partitionTracker.setStopTrackingPartitionsConsumer(partitionStopTrackingFuture::complete);
+		partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionStopTrackingAndReleaseFuture::complete);
+
+		setupExecutionGraphAndStartRunningJob(ResultPartitionType.BLOCKING, partitionTracker, new SimpleAckingTaskManagerGateway(), NettyShuffleMaster.INSTANCE);
+
+		Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get();
+		assertThat(startTrackingCall.f0, equalTo(taskExecutorResourceId));
+		assertThat(startTrackingCall.f1, equalTo(descriptor));
+
+		stateTransition.accept(execution);
+
+		switch (partitionReleaseResult) {
+			case NONE:
+				assertFalse(partitionStopTrackingFuture.isDone());
+				assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+				break;
+			case STOP_TRACKING:
+				assertTrue(partitionStopTrackingFuture.isDone());
+				assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+				final Collection<ResultPartitionID> stopTrackingCall = partitionStopTrackingFuture.get();
+				assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), stopTrackingCall);
+				break;
+			case STOP_TRACKING_AND_RELEASE:
+				assertFalse(partitionStopTrackingFuture.isDone());
+				assertTrue(partitionStopTrackingAndReleaseFuture.isDone());
+				final Collection<ResultPartitionID> stopTrackingAndReleaseCall = partitionStopTrackingAndReleaseFuture.get();
+				assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), stopTrackingAndReleaseCall);
+				break;
+		}
+	}
+
+	private void setupExecutionGraphAndStartRunningJob(ResultPartitionType resultPartitionType, PartitionTracker partitionTracker, TaskManagerGateway taskManagerGateway, ShuffleMaster<?> shuffleMaster) throws JobException, JobExecutionException {
+		final JobVertex producerVertex = createNoOpJobVertex();
+		final JobVertex consumerVertex = createNoOpJobVertex();
+		consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final SlotProvider slotProvider = new SlotProvider() {
+			@Override
+			public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
+				return CompletableFuture.completedFuture(new SimpleSlot(
+					new SingleSlotTestingSlotOwner(),
+					taskManagerLocation,
+					0,
+					taskManagerGateway));
+			}
+
+			@Override
+			public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
+			}
+		};
+
+		final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(
+			null,
+			new JobGraph(new JobID(), "test job", producerVertex, consumerVertex),
+			new Configuration(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			slotProvider,
+			ExecutionPartitionLifecycleTest.class.getClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(10),
+			new NoRestartStrategy(),
+			new UnregisteredMetricsGroup(),
+			VoidBlobWriter.getInstance(),
+			Time.seconds(10),
+			log,
+			shuffleMaster,
+			partitionTracker);
+
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+		final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
+		final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+		execution = executionVertex.getCurrentExecutionAttempt();
+
+		execution.allocateResourcesForExecution(
+			executionGraph.getSlotProviderStrategy(),
+			LocationPreferenceConstraint.ALL,
+			Collections.emptySet());
+
+		execution.deploy();
+		execution.switchToRunning();
+
+		final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex
+			.getProducedDataSets()[0]
+			.getPartitions()[0]
+			.getPartitionId();
+
+		descriptor = execution
+			.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
+		taskExecutorResourceId = taskManagerLocation.getResourceID();
+		jobId = executionGraph.getJobID();
+	}
+
+	@Nonnull
+	private JobVertex createNoOpJobVertex() {
+		final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		return jobVertex;
+	}
+
+	/**
+	 * Slot owner which records the first returned slot.
+	 */
+	private static final class SingleSlotTestingSlotOwner implements SlotOwner {
+
+		final CompletableFuture<LogicalSlot> returnedSlot = new CompletableFuture<>();
+
+		@Override
+		public void returnLogicalSlot(LogicalSlot logicalSlot) {
+			returnedSlot.complete(logicalSlot);
+		}
+	}
+
+	private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
+
+		final Queue<ShuffleDescriptor> externallyReleasedPartitions = new ArrayBlockingQueue<>(4);
+
+		@Override
+		public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
+			return CompletableFuture.completedFuture(new ShuffleDescriptor() {
+				@Override
+				public ResultPartitionID getResultPartitionID() {
+					return new ResultPartitionID(
+						partitionDescriptor.getPartitionId(),
+						producerDescriptor.getProducerExecutionId());
+				}
+
+				@Override
+				public Optional<ResourceID> storesLocalResourcesOn() {
+					return Optional.of(producerDescriptor.getProducerLocation());
+				}
+			});
+		}
+
+		@Override
+		public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
+			externallyReleasedPartitions.add(shuffleDescriptor);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index b8c83fe..433e3f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,33 +19,16 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
@@ -53,10 +36,8 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -65,7 +46,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -74,9 +54,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
 
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -526,111 +504,6 @@ public class ExecutionTest extends TestLogger {
 		assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get())));
 	}
 
-	@Test
-	public void testPartitionRetainedWhenFinished() throws Exception {
-		testPartitionTrackingForStateTransition(Execution::markFinished, false);
-	}
-
-	@Test
-	public void testPartitionReleasedWhenCanceled() throws Exception {
-		testPartitionTrackingForStateTransition(
-			execution -> {
-				execution.cancel();
-				execution.completeCancelling();
-			},
-			true);
-	}
-
-	@Test
-	public void testPartitionReleasedWhenFailed() throws Exception {
-		testPartitionTrackingForStateTransition(execution -> execution.fail(new Exception("Test exception")), true);
-	}
-
-	private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final boolean shouldPartitionBeReleased) throws Exception {
-		final JobVertex producerVertex = createNoOpJobVertex();
-		final JobVertex consumerVertex = createNoOpJobVertex();
-		consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
-
-		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
-
-		final SlotProvider slotProvider = new SlotProvider() {
-			@Override
-			public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
-				return CompletableFuture.completedFuture(new SimpleSlot(
-					new SingleSlotTestingSlotOwner(),
-					taskManagerLocation,
-					0,
-					new SimpleAckingTaskManagerGateway()));
-			}
-
-			@Override
-			public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
-			}
-		};
-
-		CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>();
-		CompletableFuture<Collection<ResultPartitionID>> partitionReleaseFuture = new CompletableFuture<>();
-		final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
-		partitionTracker.setStartTrackingPartitionsConsumer(
-			(resourceID, resultPartitionDeploymentDescriptor) ->
-				partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor))
-		);
-		partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionReleaseFuture::complete);
-
-		final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(
-			null,
-			new JobGraph(new JobID(), "test job", producerVertex, consumerVertex),
-			new Configuration(),
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			slotProvider,
-			ExecutionTest.class.getClassLoader(),
-			new StandaloneCheckpointRecoveryFactory(),
-			Time.seconds(10),
-			new NoRestartStrategy(),
-			new UnregisteredMetricsGroup(),
-			VoidBlobWriter.getInstance(),
-			Time.seconds(10),
-			log,
-			NettyShuffleMaster.INSTANCE,
-			partitionTracker);
-
-		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
-		final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
-		final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
-
-		final Execution execution = executionVertex.getCurrentExecutionAttempt();
-
-		execution.allocateResourcesForExecution(
-			executionGraph.getSlotProviderStrategy(),
-			LocationPreferenceConstraint.ALL,
-			Collections.emptySet());
-
-		assertThat(partitionStartTrackingFuture.isDone(), is(true));
-		final Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get();
-
-		final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex
-			.getProducedDataSets()[0]
-			.getPartitions()[0]
-			.getPartitionId();
-		final ResultPartitionDeploymentDescriptor descriptor = execution
-			.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
-		assertThat(startTrackingCall.f0, equalTo(taskManagerLocation.getResourceID()));
-		assertThat(startTrackingCall.f1, equalTo(descriptor));
-
-		execution.deploy();
-		execution.switchToRunning();
-
-		stateTransition.accept(execution);
-
-		assertThat(partitionReleaseFuture.isDone(), is(shouldPartitionBeReleased));
-		if (shouldPartitionBeReleased) {
-			final Collection<ResultPartitionID> partitionReleaseCall = partitionReleaseFuture.get();
-			assertThat(partitionReleaseCall, contains(descriptor.getShuffleDescriptor().getResultPartitionID()));
-		}
-	}
-
 	/**
 	 * Tests that a slot release will atomically release the assigned {@link Execution}.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
index 0d1a160..2888b31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
@@ -43,6 +43,10 @@ public enum NoOpPartitionTracker implements PartitionTracker {
 	}
 
 	@Override
+	public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+	}
+
+	@Override
 	public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 72892d6..b1a58a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -96,7 +96,7 @@ public enum PartitionTestUtils {
 		}
 	}
 
-	static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(
+	public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(
 		ResultPartitionType partitionType) {
 		ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
 		PartitionDescriptor partitionDescriptor = new PartitionDescriptor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
index 5ca7156..63e4f44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
@@ -50,12 +50,12 @@ import static org.junit.Assert.assertEquals;
 public class PartitionTrackerImplTest extends TestLogger {
 
 	@Test
-	public void testReleasedOnConsumptionPartitionIsNotTracked() {
+	public void testPipelinedPartitionIsNotTracked() {
 		testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
 	}
 
 	@Test
-	public void testRetainedOnConsumptionPartitionIsTracked() {
+	public void testBlockingPartitionIsTracked() {
 		testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
 	}
 
@@ -75,8 +75,7 @@ public class PartitionTrackerImplTest extends TestLogger {
 				resultPartitionType,
 				false));
 
-		final boolean isTrackingExpected = resultPartitionType == ResultPartitionType.BLOCKING;
-		assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(isTrackingExpected));
+		assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(resultPartitionType.isBlocking()));
 	}
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
index 2d85ff6..6ba333d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
@@ -36,6 +36,7 @@ public class TestingPartitionTracker implements PartitionTracker {
 	private Consumer<ResourceID> stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {};
 	private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {};
 	private Consumer<Collection<ResultPartitionID>> stopTrackingAndReleasePartitionsConsumer = ignored -> {};
+	private Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer = ignored -> {};
 
 	public void setStartTrackingPartitionsConsumer(BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer) {
 		this.startTrackingPartitionsConsumer = startTrackingPartitionsConsumer;
@@ -61,6 +62,10 @@ public class TestingPartitionTracker implements PartitionTracker {
 		this.stopTrackingAndReleasePartitionsConsumer = stopTrackingAndReleasePartitionsConsumer;
 	}
 
+	public void setStopTrackingPartitionsConsumer(Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer) {
+		this.stopTrackingPartitionsConsumer = stopTrackingPartitionsConsumer;
+	}
+
 	@Override
 	public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
 		this.startTrackingPartitionsConsumer.accept(producingTaskExecutorId, resultPartitionDeploymentDescriptor);
@@ -77,6 +82,11 @@ public class TestingPartitionTracker implements PartitionTracker {
 	}
 
 	@Override
+	public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+		stopTrackingPartitionsConsumer.accept(resultPartitionIds);
+	}
+
+	@Override
 	public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
 		stopTrackingAndReleaseAllPartitionsConsumer.accept(producingTaskExecutorId);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index d87cf84..6b6b9e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvi
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -90,6 +91,7 @@ import java.util.stream.StreamSupport;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -190,33 +192,62 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	}
 
 	@Test
-	public void testPartitionReleaseAfterDisconnect() throws Exception {
+	public void testBlockingPartitionReleaseAfterDisconnect() throws Exception {
 		testPartitionRelease(
 			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")),
-			true);
+			true,
+			ResultPartitionType.BLOCKING);
 	}
 
 	@Test
-	public void testPartitionReleaseAfterReleaseCall() throws Exception {
+	public void testPipelinedPartitionNotReleasedAfterDisconnect() throws Exception {
+		testPartitionRelease(
+			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")),
+			false,
+			ResultPartitionType.PIPELINED);
+	}
+
+	@Test
+	public void testBlockingPartitionReleaseAfterReleaseCall() throws Exception {
 		testPartitionRelease(
 			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)),
-			true);
+			true,
+			ResultPartitionType.BLOCKING);
 	}
 
 	@Test
-	public void testPartitionReleaseAfterShutdown() throws Exception {
+	public void testPipelinedPartitionReleaseAfterReleaseCall() throws Exception {
+		testPartitionRelease(
+			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)),
+			true,
+			ResultPartitionType.PIPELINED);
+	}
+
+	@Test
+	public void testBlockingPartitionReleaseAfterShutdown() throws Exception {
 		// don't do any explicit release action, so that the partition must be cleaned up on shutdown
 		testPartitionRelease(
 			(jobId, partitionId, taskExecutorGateway) -> { },
-			false);
+			false,
+			ResultPartitionType.BLOCKING);
+	}
+
+	@Test
+	public void testPipelinedPartitionReleaseAfterShutdown() throws Exception {
+		// don't do any explicit release action, so that the partition must be cleaned up on shutdown
+		testPartitionRelease(
+			(jobId, partitionId, taskExecutorGateway) -> { },
+			false,
+			ResultPartitionType.PIPELINED);
 	}
 
 	private void testPartitionRelease(
 		TriConsumer<JobID, ResultPartitionID, TaskExecutorGateway> releaseAction,
-		boolean waitForRelease) throws Exception {
+		boolean waitForRelease,
+		ResultPartitionType resultPartitionType) throws Exception {
 
 		final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
-			PartitionTestUtils.createPartitionDeploymentDescriptor();
+			PartitionTestUtils.createPartitionDeploymentDescriptor(resultPartitionType);
 		final ExecutionAttemptID eid1 = taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
 
 		final TaskDeploymentDescriptor taskDeploymentDescriptor =
@@ -343,7 +374,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			// the task is still running => the partition is in in-progress
 			runInTaskExecutorThreadAndWait(
 				taskExecutor,
-				() -> assertTrue(partitionTable.hasTrackedPartitions(jobId)));
+				() -> assertThat(partitionTable.hasTrackedPartitions(jobId), is(resultPartitionType.isBlocking())));
 
 			TestingInvokable.sync.releaseBlocker();
 			taskFinishedFuture.get(timeout.getSize(), timeout.getUnit());
@@ -351,7 +382,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			// the task is finished => the partition should be finished now
 			runInTaskExecutorThreadAndWait(
 				taskExecutor,
-				() -> assertTrue(partitionTable.hasTrackedPartitions(jobId)));
+				() -> assertThat(partitionTable.hasTrackedPartitions(jobId), is(resultPartitionType.isBlocking())));
 
 			final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
 			runInTaskExecutorThreadAndWait(