You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/02 07:43:59 UTC

[flink] 02/15: [FLINK-13435] Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4d7bff09a3d4c1816dc235d616753095f626243
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri Jul 26 12:13:23 2019 +0200

    [FLINK-13435] Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type
    
    In a long term we do not need auto-release semantics for blocking (persistent) partition. We expect them always to be released externally by JM and assume they can be consumed multiple times.
    
    The pipelined partitions have always only one consumer and one consumption attempt. Afterwards they can be always released automatically.
    
    ShuffleDescriptor.ReleaseType was introduced to make release semantics more flexible but it is not needed in a long term.
    
    FORCE_PARTITION_RELEASE_ON_CONSUMPTION was introduced as a safety net to be able to fallback to 1.8 behaviour without the partition tracker and JM taking care about blocking partition release. We can make this option specific for NettyShuffleEnvironment which was the only existing shuffle service before. If it is activated then the blocking partition is also auto-released on a consumption attempt as it was before. The fine-grained recovery will just not find the partition after the jo [...]
---
 .../flink/configuration/JobManagerOptions.java     |  5 --
 .../NettyShuffleEnvironmentOptions.java            |  5 ++
 .../ResultPartitionDeploymentDescriptor.java       | 57 ----------------------
 .../flink/runtime/executiongraph/Execution.java    |  9 +---
 .../runtime/executiongraph/ExecutionGraph.java     | 10 ----
 .../executiongraph/ExecutionGraphBuilder.java      |  4 --
 .../io/network/NettyShuffleServiceFactory.java     |  3 +-
 .../io/network/partition/PartitionTrackerImpl.java |  4 +-
 .../network/partition/ResultPartitionFactory.java  | 10 ++--
 .../runtime/shuffle/NettyShuffleDescriptor.java    | 19 +-------
 .../flink/runtime/shuffle/NettyShuffleMaster.java  |  3 +-
 .../flink/runtime/shuffle/ShuffleDescriptor.java   | 34 +------------
 .../flink/runtime/shuffle/ShuffleEnvironment.java  | 26 +++++-----
 .../flink/runtime/shuffle/ShuffleMaster.java       |  5 --
 .../runtime/shuffle/UnknownShuffleDescriptor.java  |  6 ---
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  4 +-
 .../NettyShuffleEnvironmentConfiguration.java      | 21 ++++++--
 .../ResultPartitionDeploymentDescriptorTest.java   | 15 +-----
 .../io/network/NettyShuffleEnvironmentBuilder.java |  3 +-
 .../io/network/partition/PartitionTestUtils.java   | 56 ++-------------------
 .../partition/PartitionTrackerImplTest.java        | 43 ++++++++--------
 .../network/partition/ResultPartitionBuilder.java  |  4 +-
 .../partition/ResultPartitionFactoryTest.java      | 26 ++++++----
 .../TaskExecutorPartitionLifecycleTest.java        |  3 +-
 .../util/NettyShuffleDescriptorBuilder.java        | 13 +----
 .../recovery/BatchFineGrainedRecoveryITCase.java   |  2 +-
 26 files changed, 103 insertions(+), 287 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index e062829..3643667 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -195,11 +195,6 @@ public class JobManagerOptions {
 			.defaultValue(true)
 			.withDescription("Controls whether partitions should already be released during the job execution.");
 
-	@Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
-	public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
-			key("jobmanager.scheduler.partition.force-release-on-consumption")
-			.defaultValue(false);
-
 	// ---------------------------------------------------------------------------------------------
 
 	private JobManagerOptions() {
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 4ba4c8e..733085e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -231,6 +231,11 @@ public class NettyShuffleEnvironmentOptions {
 
 	// ------------------------------------------------------------------------
 
+	@Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
+	public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
+		key("taskmanager.network.partition.force-release-on-consumption")
+			.defaultValue(false);
+
 	/** Not intended to be instantiated. */
 	private NettyShuffleEnvironmentOptions() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 39b61d2..064c9bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -18,22 +18,16 @@
 
 package org.apache.flink.runtime.deployment;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 
 import java.io.Serializable;
-import java.util.Collection;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -54,48 +48,16 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 	/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
 	private final boolean sendScheduleOrUpdateConsumersMessage;
 
-	private final ReleaseType releaseType;
-
-	@VisibleForTesting
 	public ResultPartitionDeploymentDescriptor(
 			PartitionDescriptor partitionDescriptor,
 			ShuffleDescriptor shuffleDescriptor,
 			int maxParallelism,
 			boolean sendScheduleOrUpdateConsumersMessage) {
-		this(
-			checkNotNull(partitionDescriptor),
-			shuffleDescriptor,
-			maxParallelism,
-			sendScheduleOrUpdateConsumersMessage,
-			ReleaseType.AUTO);
-	}
-
-	public ResultPartitionDeploymentDescriptor(
-			PartitionDescriptor partitionDescriptor,
-			ShuffleDescriptor shuffleDescriptor,
-			int maxParallelism,
-			boolean sendScheduleOrUpdateConsumersMessage,
-			ReleaseType releaseType) {
-		checkReleaseOnConsumptionIsSupportedForPartition(shuffleDescriptor, releaseType);
 		this.partitionDescriptor = checkNotNull(partitionDescriptor);
 		this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
 		KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
 		this.maxParallelism = maxParallelism;
 		this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
-		this.releaseType = releaseType;
-	}
-
-	private static void checkReleaseOnConsumptionIsSupportedForPartition(
-			ShuffleDescriptor shuffleDescriptor,
-			ReleaseType releaseType) {
-		checkNotNull(shuffleDescriptor);
-		checkArgument(
-			shuffleDescriptor.getSupportedReleaseTypes().contains(releaseType),
-			"Release type %s is not supported by the shuffle service for this partition" +
-				"(id: %s), supported release types: %s",
-			releaseType,
-			shuffleDescriptor.getResultPartitionID(),
-			shuffleDescriptor.getSupportedReleaseTypes());
 	}
 
 	public IntermediateDataSetID getResultId() {
@@ -126,25 +88,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 		return sendScheduleOrUpdateConsumersMessage;
 	}
 
-	/**
-	 * Returns whether to release the partition after having been fully consumed once.
-	 *
-	 * <p>Indicates whether the shuffle service should automatically release all partition resources after
-	 * the first full consumption has been acknowledged. This kind of partition does not need to be explicitly released
-	 * by {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
-	 * and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
-	 *
-	 * <p>The partition has to support the corresponding {@link ReleaseType} in
-	 * {@link ShuffleDescriptor#getSupportedReleaseTypes()}:
-	 * {@link ReleaseType#AUTO} for {@code isReleasedOnConsumption()} to return {@code true} and
-	 * {@link ReleaseType#MANUAL} for {@code isReleasedOnConsumption()} to return {@code false}.
-	 *
-	 * @return whether to release the partition after having been fully consumed once.
-	 */
-	public boolean isReleasedOnConsumption() {
-		return releaseType == ReleaseType.AUTO;
-	}
-
 	@Override
 	public String toString() {
 		return String.format("ResultPartitionDeploymentDescriptor [PartitionDescriptor: %s, "
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 6679fe1..af94188 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -631,19 +631,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				.getShuffleMaster()
 				.registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
 
-			final boolean releasePartitionOnConsumption =
-				vertex.getExecutionGraph().isForcePartitionReleaseOnConsumption()
-				|| !partitionDescriptor.getPartitionType().isBlocking();
-
 			CompletableFuture<ResultPartitionDeploymentDescriptor> partitionRegistration = shuffleDescriptorFuture
 				.thenApply(shuffleDescriptor -> new ResultPartitionDeploymentDescriptor(
 					partitionDescriptor,
 					shuffleDescriptor,
 					maxParallelism,
-					lazyScheduling,
-					releasePartitionOnConsumption
-						? ShuffleDescriptor.ReleaseType.AUTO
-						: ShuffleDescriptor.ReleaseType.MANUAL));
+					lazyScheduling));
 			partitionRegistrations.add(partitionRegistration);
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cc51042..f779fd2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -320,8 +320,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	/** Shuffle master to register partitions for task deployment. */
 	private final ShuffleMaster<?> shuffleMaster;
 
-	private boolean forcePartitionReleaseOnConsumption;
-
 	// --------------------------------------------------------------------------------------------
 	//   Constructors
 	// --------------------------------------------------------------------------------------------
@@ -425,7 +423,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			allocationTimeout,
 			new NotReleasingPartitionReleaseStrategy.Factory(),
 			NettyShuffleMaster.INSTANCE,
-			true,
 			new PartitionTrackerImpl(
 				jobInformation.getJobId(),
 				NettyShuffleMaster.INSTANCE,
@@ -448,7 +445,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			Time allocationTimeout,
 			PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory,
 			ShuffleMaster<?> shuffleMaster,
-			boolean forcePartitionReleaseOnConsumption,
 			PartitionTracker partitionTracker,
 			ScheduleMode scheduleMode,
 			boolean allowQueuedScheduling) throws IOException {
@@ -511,8 +507,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 		this.shuffleMaster = checkNotNull(shuffleMaster);
 
-		this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
-
 		this.partitionTracker = checkNotNull(partitionTracker);
 
 		this.resultPartitionAvailabilityChecker = new ExecutionGraphResultPartitionAvailabilityChecker(
@@ -737,10 +731,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		return globalModVersion - 1;
 	}
 
-	boolean isForcePartitionReleaseOnConsumption() {
-		return forcePartitionReleaseOnConsumption;
-	}
-
 	@Override
 	public ExecutionJobVertex getJobVertex(JobVertexID id) {
 		return this.tasks.get(id);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 018fd81..adbd5fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -161,9 +161,6 @@ public class ExecutionGraphBuilder {
 		final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
 			PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);
 
-		final boolean forcePartitionReleaseOnConsumption =
-			jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
-
 		// create a new execution graph, if none exists so far
 		final ExecutionGraph executionGraph;
 		try {
@@ -182,7 +179,6 @@ public class ExecutionGraphBuilder {
 					allocationTimeout,
 					partitionReleaseStrategyFactory,
 					shuffleMaster,
-					forcePartitionReleaseOnConsumption,
 					partitionTracker,
 					jobGraph.getScheduleMode(),
 					jobGraph.getAllowQueuedScheduling());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 431360a..f266c77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -104,7 +104,8 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			config.getBlockingSubpartitionType(),
 			config.networkBuffersPerChannel(),
 			config.floatingNetworkBuffersPerGate(),
-			config.networkBufferSize());
+			config.networkBufferSize(),
+			config.isForcePartitionReleaseOnConsumption());
 
 		SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(
 			taskExecutorResourceId,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index c52e8b1..f772b37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -64,8 +64,8 @@ public class PartitionTrackerImpl implements PartitionTracker {
 		Preconditions.checkNotNull(producingTaskExecutorId);
 		Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
 
-		// if it is released on consumption we do not need to issue any release calls
-		if (resultPartitionDeploymentDescriptor.isReleasedOnConsumption()) {
+		// only blocking partitions require explicit release call
+		if (!resultPartitionDeploymentDescriptor.getPartitionType().isBlocking()) {
 			return;
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 4d5cf23..b390987 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -63,6 +63,8 @@ public class ResultPartitionFactory {
 
 	private final int networkBufferSize;
 
+	private final boolean forcePartitionReleaseOnConsumption;
+
 	public ResultPartitionFactory(
 		@Nonnull ResultPartitionManager partitionManager,
 		@Nonnull FileChannelManager channelManager,
@@ -70,7 +72,8 @@ public class ResultPartitionFactory {
 		BoundedBlockingSubpartitionType blockingSubpartitionType,
 		int networkBuffersPerChannel,
 		int floatingNetworkBuffersPerGate,
-		int networkBufferSize) {
+		int networkBufferSize,
+		boolean forcePartitionReleaseOnConsumption) {
 
 		this.partitionManager = partitionManager;
 		this.channelManager = channelManager;
@@ -79,6 +82,7 @@ public class ResultPartitionFactory {
 		this.bufferPoolFactory = bufferPoolFactory;
 		this.blockingSubpartitionType = blockingSubpartitionType;
 		this.networkBufferSize = networkBufferSize;
+		this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
 	}
 
 	public ResultPartition create(
@@ -91,7 +95,6 @@ public class ResultPartitionFactory {
 			desc.getPartitionType(),
 			desc.getNumberOfSubpartitions(),
 			desc.getMaxParallelism(),
-			desc.isReleasedOnConsumption(),
 			createBufferPoolFactory(desc.getNumberOfSubpartitions(), desc.getPartitionType()));
 	}
 
@@ -102,12 +105,11 @@ public class ResultPartitionFactory {
 		@Nonnull ResultPartitionType type,
 		int numberOfSubpartitions,
 		int maxParallelism,
-		boolean releasePartitionOnConsumption,
 		FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
 
 		ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
 
-		ResultPartition partition = releasePartitionOnConsumption
+		ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking()
 			? new ReleaseOnConsumptionResultPartition(
 				taskNameWithSubtaskAndId,
 				id,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
index 8086b27..f758bcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import java.io.Serializable;
 import java.net.InetSocketAddress;
-import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -35,29 +34,19 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
 
 	private static final long serialVersionUID = 852181945034989215L;
 
-	private static final EnumSet<ReleaseType> SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS =
-		EnumSet.of(ReleaseType.AUTO, ReleaseType.MANUAL);
-
-	private static final EnumSet<ReleaseType> SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS =
-		EnumSet.of(ReleaseType.AUTO);
-
 	private final ResourceID producerLocation;
 
 	private final PartitionConnectionInfo partitionConnectionInfo;
 
 	private final ResultPartitionID resultPartitionID;
 
-	private final boolean isBlocking;
-
 	public NettyShuffleDescriptor(
 			ResourceID producerLocation,
 			PartitionConnectionInfo partitionConnectionInfo,
-			ResultPartitionID resultPartitionID,
-			boolean isBlocking) {
+			ResultPartitionID resultPartitionID) {
 		this.producerLocation = producerLocation;
 		this.partitionConnectionInfo = partitionConnectionInfo;
 		this.resultPartitionID = resultPartitionID;
-		this.isBlocking = isBlocking;
 	}
 
 	public ConnectionID getConnectionId() {
@@ -74,12 +63,6 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
 		return Optional.of(producerLocation);
 	}
 
-	@Override
-	public EnumSet<ReleaseType> getSupportedReleaseTypes() {
-		return isBlocking ?
-			SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS : SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS;
-	}
-
 	public boolean isLocalTo(ResourceID consumerLocation) {
 		return producerLocation.equals(consumerLocation);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index c369ff1..6c2cb32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -42,8 +42,7 @@ public enum NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
 		NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor(
 			producerDescriptor.getProducerLocation(),
 			createConnectionInfo(producerDescriptor, partitionDescriptor.getConnectionIndex()),
-			resultPartitionID,
-			partitionDescriptor.getPartitionType().isBlocking());
+			resultPartitionID);
 
 		return CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
index 5af56f2..17feacb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
@@ -19,12 +19,10 @@
 package org.apache.flink.runtime.shuffle;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -61,39 +59,11 @@ public interface ShuffleDescriptor extends Serializable {
 	 *
 	 * <p>Indicates that this partition occupies local resources in the producing task executor. Such partition requires
 	 * that the task executor is running and being connected to be able to consume the produced data. This is mostly
-	 * relevant for the batch jobs and blocking result partitions which should outlive the producer lifetime and
-	 * be released externally: {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false}.
+	 * relevant for the batch jobs and blocking result partitions which can outlive the producer lifetime and
+	 * be released externally.
 	 * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)} can be used to release such kind of partitions locally.
 	 *
 	 * @return the resource id of the producing task executor if the partition occupies local resources there
 	 */
 	Optional<ResourceID> storesLocalResourcesOn();
-
-	/**
-	 * Return release types supported by Shuffle Service for this partition.
-	 */
-	EnumSet<ReleaseType> getSupportedReleaseTypes();
-
-	/**
-	 * Partition release type.
-	 */
-	enum ReleaseType {
-		/**
-		 * Auto-release the partition after having been fully consumed once.
-		 *
-		 * <p>No additional actions required, like {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
-		 * or {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}
-		 */
-		AUTO,
-
-		/**
-		 * Manually release the partition, the partition has to support consumption multiple times.
-		 *
-		 * <p>The partition requires manual release once all consumption is done:
-		 * {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and
-		 * if the partition occupies producer local resources ({@link #storesLocalResourcesOn()}) then also
-		 * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
-		 */
-		MANUAL
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
index ed66f2d..e6a4802 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -61,17 +60,18 @@ import java.util.Collection;
  *     <li>{@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called
  *     if the production has failed.
  *     </li>
- *     <li>if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true} and
- *     {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called when the production is done.
- *     The actual release can take some time depending on implementation details,
- *     e.g. if the `end of consumption' confirmation from the consumer is being awaited implicitly.
- *     The partition has to support the {@link ReleaseType#AUTO} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}.</li>
- *     <li>if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false} and
- *     {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)},
- *     if it occupies any producer local resources ({@link ShuffleDescriptor#storesLocalResourcesOn()}),
- *     are called outside of the producer thread, e.g. to manage the lifecycle of BLOCKING result partitions
- *     which can outlive their producers. The partition has to support the {@link ReleaseType#MANUAL} in
- *     {@link ShuffleDescriptor#getSupportedReleaseTypes()}.</li>
+ *     <li>for PIPELINED partitions if there was a detected consumption attempt and it either failed or finished
+ *     after the bounded production has been done ({@link ResultPartitionWriter#finish()} and
+ *     {@link ResultPartitionWriter#close()} have been called). Only one consumption attempt is ever expected for
+ *     the PIPELINED partition at the moment so it can be released afterwards.
+ *     <li>if the following methods are called outside of the producer thread:
+ *     <ol>
+ *         <li>{@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}</li>
+ *         <li>and if it occupies any producer local resources ({@link ShuffleDescriptor#storesLocalResourcesOn()})
+ *             then also {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}</li>
+ *     </ol>
+ *     e.g. to manage the lifecycle of BLOCKING result partitions which can outlive their producers.
+ *     The BLOCKING partitions can be consumed multiple times.</li>
  * </ol>
  * The partitions, which currently still occupy local resources, can be queried with
  * {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}.
@@ -132,8 +132,6 @@ public interface ShuffleEnvironment<P extends ResultPartitionWriter, G extends I
 	 *
 	 * <p>This is called for partitions which occupy resources locally
 	 * (can be checked by {@link ShuffleDescriptor#storesLocalResourcesOn()}).
-	 * This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
-	 * The partition has to support the {@link ReleaseType#MANUAL} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}.
 	 *
 	 * @param partitionIds identifying the partitions to be released
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
index a9ef1c6..9f729c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.shuffle;
 
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
-
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
@@ -51,8 +48,6 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> {
 	 *
 	 * <p>This call triggers release of any resources which are occupied by the given partition in the external systems
 	 * outside of the producer executor. This is mostly relevant for the batch jobs and blocking result partitions.
-	 * This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
-	 * The partition has to support the {@link ReleaseType#MANUAL} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}.
 	 * The producer local resources are managed by {@link ShuffleDescriptor#storesLocalResourcesOn()} and
 	 * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
index 7c35516..339f343 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.shuffle;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
-import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -57,9 +56,4 @@ public final class UnknownShuffleDescriptor implements ShuffleDescriptor {
 	public Optional<ResourceID> storesLocalResourcesOn() {
 		return Optional.empty();
 	}
-
-	@Override
-	public EnumSet<ReleaseType> getSupportedReleaseTypes() {
-		return EnumSet.noneOf(ReleaseType.class);
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8c99c0f..9b295dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -613,8 +613,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	private void setupResultPartitionBookkeeping(TaskDeploymentDescriptor tdd, CompletableFuture<ExecutionState> terminationFuture) {
 		final List<ResultPartitionID> partitionsRequiringRelease = tdd.getProducedPartitions().stream()
-			// partitions that are released on consumption don't require any explicit release call
-			.filter(d -> !d.isReleasedOnConsumption())
+			// only blocking partitions require explicit release call
+			.filter(d -> d.getPartitionType().isBlocking())
 			.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
 			// partitions without local resources don't store anything on the TaskExecutor
 			.filter(d -> d.storesLocalResourcesOn().isPresent())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index e73cc6a..38817a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -73,6 +73,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
 	private final BoundedBlockingSubpartitionType blockingSubpartitionType;
 
+	private final boolean forcePartitionReleaseOnConsumption;
+
 	public NettyShuffleEnvironmentConfiguration(
 			int numNetworkBuffers,
 			int networkBufferSize,
@@ -85,7 +87,8 @@ public class NettyShuffleEnvironmentConfiguration {
 			boolean isNetworkDetailedMetrics,
 			@Nullable NettyConfig nettyConfig,
 			String[] tempDirs,
-			BoundedBlockingSubpartitionType blockingSubpartitionType) {
+			BoundedBlockingSubpartitionType blockingSubpartitionType,
+			boolean forcePartitionReleaseOnConsumption) {
 
 		this.numNetworkBuffers = numNetworkBuffers;
 		this.networkBufferSize = networkBufferSize;
@@ -99,6 +102,7 @@ public class NettyShuffleEnvironmentConfiguration {
 		this.nettyConfig = nettyConfig;
 		this.tempDirs = Preconditions.checkNotNull(tempDirs);
 		this.blockingSubpartitionType = Preconditions.checkNotNull(blockingSubpartitionType);
+		this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
 	}
 
 	// ------------------------------------------------------------------------
@@ -151,6 +155,10 @@ public class NettyShuffleEnvironmentConfiguration {
 		return blockingSubpartitionType;
 	}
 
+	public boolean isForcePartitionReleaseOnConsumption() {
+		return forcePartitionReleaseOnConsumption;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -194,6 +202,9 @@ public class NettyShuffleEnvironmentConfiguration {
 
 		BoundedBlockingSubpartitionType blockingSubpartitionType = getBlockingSubpartitionType(configuration);
 
+		boolean forcePartitionReleaseOnConsumption =
+			configuration.getBoolean(NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
+
 		return new NettyShuffleEnvironmentConfiguration(
 			numberOfNetworkBuffers,
 			pageSize,
@@ -206,7 +217,8 @@ public class NettyShuffleEnvironmentConfiguration {
 			isNetworkDetailedMetrics,
 			nettyConfig,
 			tempDirs,
-			blockingSubpartitionType);
+			blockingSubpartitionType,
+			forcePartitionReleaseOnConsumption);
 	}
 
 	/**
@@ -521,6 +533,7 @@ public class NettyShuffleEnvironmentConfiguration {
 		result = 31 * result + (isCreditBased ? 1 : 0);
 		result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
 		result = 31 * result + Arrays.hashCode(tempDirs);
+		result = 31 * result + (forcePartitionReleaseOnConsumption ? 1 : 0);
 		return result;
 	}
 
@@ -544,7 +557,8 @@ public class NettyShuffleEnvironmentConfiguration {
 					this.requestSegmentsTimeout.equals(that.requestSegmentsTimeout) &&
 					this.isCreditBased == that.isCreditBased &&
 					(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null) &&
-					Arrays.equals(this.tempDirs, that.tempDirs);
+					Arrays.equals(this.tempDirs, that.tempDirs) &&
+					this.forcePartitionReleaseOnConsumption == that.forcePartitionReleaseOnConsumption;
 		}
 	}
 
@@ -561,6 +575,7 @@ public class NettyShuffleEnvironmentConfiguration {
 				", isCreditBased=" + isCreditBased +
 				", nettyConfig=" + nettyConfig +
 				", tempDirs=" + Arrays.toString(tempDirs) +
+				", forcePartitionReleaseOnConsumption=" + forcePartitionReleaseOnConsumption +
 				'}';
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 49cd478..e76af09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -30,9 +30,7 @@ import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
-import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -90,8 +88,7 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
 		ShuffleDescriptor shuffleDescriptor = new NettyShuffleDescriptor(
 			producerLocation,
 			new NetworkPartitionConnectionInfo(connectionID),
-			resultPartitionID,
-			false);
+			resultPartitionID);
 
 		ResultPartitionDeploymentDescriptor copy =
 			createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
@@ -104,16 +101,6 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
 		assertThat(shuffleDescriptorCopy.getConnectionId(), is(connectionID));
 	}
 
-	@Test(expected = IllegalArgumentException.class)
-	public void testIncompatibleReleaseTypeManual() {
-		new ResultPartitionDeploymentDescriptor(
-			partitionDescriptor,
-			NettyShuffleDescriptorBuilder.newBuilder().setBlocking(false).buildLocal(),
-			1,
-			true,
-			ReleaseType.MANUAL);
-	}
-
 	private static ResultPartitionDeploymentDescriptor createCopyAndVerifyResultPartitionDeploymentDescriptor(
 			ShuffleDescriptor shuffleDescriptor) throws IOException {
 		ResultPartitionDeploymentDescriptor orig = new ResultPartitionDeploymentDescriptor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 2d360ba..96b6330 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -143,7 +143,8 @@ public class NettyShuffleEnvironmentBuilder {
 				isNetworkDetailedMetrics,
 				nettyConfig,
 				tempDirs,
-				BoundedBlockingSubpartitionType.AUTO),
+				BoundedBlockingSubpartitionType.AUTO,
+				false),
 			taskManagerLocation,
 			taskEventDispatcher,
 			metricGroup);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 5f80421..cf50051 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -30,8 +29,6 @@ import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.hamcrest.Matchers;
 
 import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Optional;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertThat;
@@ -97,8 +94,9 @@ public class PartitionTestUtils {
 		}
 	}
 
-	public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(ResultPartitionType partitionType) {
-		ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal();
+	static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(
+		ResultPartitionType partitionType) {
+		ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
 		PartitionDescriptor partitionDescriptor = new PartitionDescriptor(
 			new IntermediateDataSetID(),
 			shuffleDescriptor.getResultPartitionID().getPartitionId(),
@@ -112,52 +110,8 @@ public class PartitionTestUtils {
 			true);
 	}
 
-	public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(ShuffleDescriptor.ReleaseType releaseType) {
-		// set partition to blocking to support all release types
-		ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().setBlocking(true).buildLocal();
-		PartitionDescriptor partitionDescriptor = new PartitionDescriptor(
-			new IntermediateDataSetID(),
-			shuffleDescriptor.getResultPartitionID().getPartitionId(),
-			ResultPartitionType.BLOCKING,
-			1,
-			0);
-		return new ResultPartitionDeploymentDescriptor(
-			partitionDescriptor,
-			shuffleDescriptor,
-			1,
-			true,
-			releaseType);
-	}
-
-	public static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(ResultPartitionID resultPartitionId, ShuffleDescriptor.ReleaseType releaseType, boolean hasLocalResources) {
-		return new ResultPartitionDeploymentDescriptor(
-			new PartitionDescriptor(
-				new IntermediateDataSetID(),
-				resultPartitionId.getPartitionId(),
-				ResultPartitionType.BLOCKING,
-				1,
-				0),
-			new ShuffleDescriptor() {
-				@Override
-				public ResultPartitionID getResultPartitionID() {
-					return resultPartitionId;
-				}
-
-				@Override
-				public Optional<ResourceID> storesLocalResourcesOn() {
-					return hasLocalResources
-						? Optional.of(ResourceID.generate())
-						: Optional.empty();
-				}
-
-				@Override
-				public EnumSet<ReleaseType> getSupportedReleaseTypes() {
-					return EnumSet.of(releaseType);
-				}
-			},
-			1,
-			true,
-			releaseType);
+	public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor() {
+		return createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
 	}
 
 	public static void writeBuffers(ResultPartition partition, int numberOfBuffers, int bufferSize) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
index 07aba93..5ca7156 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
@@ -34,14 +34,12 @@ import org.junit.Test;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
@@ -53,15 +51,15 @@ public class PartitionTrackerImplTest extends TestLogger {
 
 	@Test
 	public void testReleasedOnConsumptionPartitionIsNotTracked() {
-		testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType.AUTO);
+		testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
 	}
 
 	@Test
 	public void testRetainedOnConsumptionPartitionIsTracked() {
-		testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType.MANUAL);
+		testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
 	}
 
-	private void testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType releaseType) {
+	private void testReleaseOnConsumptionHandling(ResultPartitionType resultPartitionType) {
 		final PartitionTracker partitionTracker = new PartitionTrackerImpl(
 			new JobID(),
 			new TestingShuffleMaster(),
@@ -74,10 +72,11 @@ public class PartitionTrackerImplTest extends TestLogger {
 			resourceId,
 			createResultPartitionDeploymentDescriptor(
 				resultPartitionId,
-				releaseType,
+				resultPartitionType,
 				false));
 
-		assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(not(releaseType)));
+		final boolean isTrackingExpected = resultPartitionType == ResultPartitionType.BLOCKING;
+		assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(isTrackingExpected));
 	}
 
 	@Test
@@ -97,7 +96,7 @@ public class PartitionTrackerImplTest extends TestLogger {
 
 		partitionTracker.startTrackingPartition(
 			executorWithTrackedPartition,
-			createResultPartitionDeploymentDescriptor(new ResultPartitionID(), ShuffleDescriptor.ReleaseType.MANUAL, true));
+			createResultPartitionDeploymentDescriptor(new ResultPartitionID(), true));
 
 		assertThat(partitionTracker.isTrackingPartitionsFor(executorWithTrackedPartition), is(true));
 		assertThat(partitionTracker.isTrackingPartitionsFor(executorWithoutTrackedPartition), is(false));
@@ -127,10 +126,10 @@ public class PartitionTrackerImplTest extends TestLogger {
 
 		partitionTracker.startTrackingPartition(
 			taskExecutorId1,
-			createResultPartitionDeploymentDescriptor(resultPartitionId1, ShuffleDescriptor.ReleaseType.MANUAL, true));
+			createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
 		partitionTracker.startTrackingPartition(
 			taskExecutorId2,
-			createResultPartitionDeploymentDescriptor(resultPartitionId2, ShuffleDescriptor.ReleaseType.MANUAL, true));
+			createResultPartitionDeploymentDescriptor(resultPartitionId2, true));
 
 		{
 			partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1);
@@ -183,10 +182,10 @@ public class PartitionTrackerImplTest extends TestLogger {
 
 		partitionTracker.startTrackingPartition(
 			taskExecutorId1,
-			createResultPartitionDeploymentDescriptor(resultPartitionId1, ShuffleDescriptor.ReleaseType.MANUAL, false));
+			createResultPartitionDeploymentDescriptor(resultPartitionId1, false));
 		partitionTracker.startTrackingPartition(
 			taskExecutorId2,
-			createResultPartitionDeploymentDescriptor(resultPartitionId2, ShuffleDescriptor.ReleaseType.MANUAL, false));
+			createResultPartitionDeploymentDescriptor(resultPartitionId2, false));
 
 		{
 			partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1);
@@ -227,7 +226,7 @@ public class PartitionTrackerImplTest extends TestLogger {
 
 		partitionTracker.startTrackingPartition(
 			taskExecutorId1,
-			createResultPartitionDeploymentDescriptor(resultPartitionId1, ShuffleDescriptor.ReleaseType.MANUAL, true));
+			createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
 
 		partitionTracker.stopTrackingPartitionsFor(taskExecutorId1);
 
@@ -236,15 +235,21 @@ public class PartitionTrackerImplTest extends TestLogger {
 	}
 
 	private static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(
+			ResultPartitionID resultPartitionId,
+			boolean hasLocalResources) {
+		return createResultPartitionDeploymentDescriptor(resultPartitionId, ResultPartitionType.BLOCKING, hasLocalResources);
+	}
+
+	private static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(
 		ResultPartitionID resultPartitionId,
-		ShuffleDescriptor.ReleaseType releaseType,
+		ResultPartitionType type,
 		boolean hasLocalResources) {
 
 		return new ResultPartitionDeploymentDescriptor(
 			new PartitionDescriptor(
 				new IntermediateDataSetID(),
 				resultPartitionId.getPartitionId(),
-				ResultPartitionType.BLOCKING,
+				type,
 				1,
 				0),
 			new ShuffleDescriptor() {
@@ -259,15 +264,9 @@ public class PartitionTrackerImplTest extends TestLogger {
 						? Optional.of(ResourceID.generate())
 						: Optional.empty();
 				}
-
-				@Override
-				public EnumSet<ReleaseType> getSupportedReleaseTypes() {
-					return EnumSet.of(releaseType);
-				}
 			},
 			1,
-			true,
-			releaseType);
+			true);
 	}
 
 	private static TaskExecutorGateway createTaskExecutorGateway(ResourceID taskExecutorId, Collection<Tuple3<ResourceID, JobID, Collection<ResultPartitionID>>> releaseCalls) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 8eb68d3..27eaab8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -142,7 +142,8 @@ public class ResultPartitionBuilder {
 			blockingSubpartitionType,
 			networkBuffersPerChannel,
 			floatingNetworkBuffersPerGate,
-			networkBufferSize);
+			networkBufferSize,
+			releasedOnConsumption);
 
 		FunctionWithException<BufferPoolOwner, BufferPool, IOException> factory = bufferPoolFactory.orElseGet(() ->
 			resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, partitionType));
@@ -153,7 +154,6 @@ public class ResultPartitionBuilder {
 			partitionType,
 			numberOfSubpartitions,
 			numTargetKeyGroups,
-			releasedOnConsumption,
 			factory);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index b2a4d16..8065829 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
@@ -57,18 +56,26 @@ public class ResultPartitionFactoryTest extends TestLogger {
 	}
 
 	@Test
-	public void testConsumptionOnReleaseEnabled() {
-		final ResultPartition resultPartition = createResultPartition(ShuffleDescriptor.ReleaseType.AUTO);
+	public void testConsumptionOnReleaseForced() {
+		final ResultPartition resultPartition = createResultPartition(true, ResultPartitionType.BLOCKING);
+		assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));
+	}
+
+	@Test
+	public void testConsumptionOnReleaseEnabledForNonBlocking() {
+		final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.PIPELINED);
 		assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));
 	}
 
 	@Test
 	public void testConsumptionOnReleaseDisabled() {
-		final ResultPartition resultPartition = createResultPartition(ShuffleDescriptor.ReleaseType.MANUAL);
+		final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.BLOCKING);
 		assertThat(resultPartition, not(instanceOf(ReleaseOnConsumptionResultPartition.class)));
 	}
 
-	private static ResultPartition createResultPartition(ShuffleDescriptor.ReleaseType releaseType) {
+	private static ResultPartition createResultPartition(
+			boolean releasePartitionOnConsumption,
+			ResultPartitionType partitionType) {
 		ResultPartitionFactory factory = new ResultPartitionFactory(
 			new ResultPartitionManager(),
 			fileChannelManager,
@@ -76,9 +83,9 @@ public class ResultPartitionFactoryTest extends TestLogger {
 			BoundedBlockingSubpartitionType.AUTO,
 			1,
 			1,
-			64);
+			64,
+			releasePartitionOnConsumption);
 
-		ResultPartitionType partitionType = ResultPartitionType.BLOCKING;
 		final ResultPartitionDeploymentDescriptor descriptor = new ResultPartitionDeploymentDescriptor(
 			new PartitionDescriptor(
 				new IntermediateDataSetID(),
@@ -86,10 +93,9 @@ public class ResultPartitionFactoryTest extends TestLogger {
 				partitionType,
 				1,
 				0),
-			NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
+			NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
 			1,
-			true,
-			releaseType
+			true
 		);
 
 		return factory.create("test", descriptor);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index f5999e9..d87cf84 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -57,7 +57,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -217,7 +216,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		boolean waitForRelease) throws Exception {
 
 		final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
-			PartitionTestUtils.createPartitionDeploymentDescriptor(ShuffleDescriptor.ReleaseType.MANUAL);
+			PartitionTestUtils.createPartitionDeploymentDescriptor();
 		final ExecutionAttemptID eid1 = taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
 
 		final TaskDeploymentDescriptor taskDeploymentDescriptor =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
index 0ecd81a..2d58d03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
@@ -40,8 +40,6 @@ public class NettyShuffleDescriptorBuilder {
 	private InetAddress address = InetAddress.getLoopbackAddress();
 	private int dataPort = 0;
 	private int connectionIndex = 0;
-	private boolean isBlocking;
-
 	public NettyShuffleDescriptorBuilder setProducerLocation(ResourceID producerLocation) {
 		this.producerLocation = producerLocation;
 		return this;
@@ -74,26 +72,19 @@ public class NettyShuffleDescriptorBuilder {
 		return this;
 	}
 
-	public NettyShuffleDescriptorBuilder setBlocking(boolean isBlocking) {
-		this.isBlocking = isBlocking;
-		return this;
-	}
-
 	public NettyShuffleDescriptor buildRemote() {
 		ConnectionID connectionID = new ConnectionID(new InetSocketAddress(address, dataPort), connectionIndex);
 		return new NettyShuffleDescriptor(
 			producerLocation,
 			new NetworkPartitionConnectionInfo(connectionID),
-			id,
-			isBlocking);
+			id);
 	}
 
 	public NettyShuffleDescriptor buildLocal() {
 		return new NettyShuffleDescriptor(
 			producerLocation,
 			LocalExecutionPartitionConnectionInfo.INSTANCE,
-			id,
-			isBlocking);
+			id);
 	}
 
 	public static NettyShuffleDescriptorBuilder newBuilder() {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
index 1549864..9e90c20 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
@@ -58,7 +58,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
-import static org.apache.flink.configuration.JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION;
+import static org.apache.flink.configuration.NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION;
 import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;