You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/02 07:43:57 UTC

[flink] branch release-1.9 updated (32dd1b1 -> b5ab84c)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 32dd1b1  [FLINK-13427][hive] HiveCatalog's createFunction fails when function name has upper-case characters
     new ddaeffa  [hotfix][tests] Remove setting the default value of force-release-on-consumption
     new b4d7bff  [FLINK-13435] Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type
     new a7869bc  [hotfix][network] fix codestyle issues in ResultPartitionFactory
     new 58e0ed2  [hotfix][network] Annotate NettyShuffleDescriptor#PartitionConnectionInfo with @FunctionalInterface
     new 18baca7  [hotfix][network] fix codestyle issues in NettyShuffleMaster
     new 355a80a  [hotfix] fix codestyle issues in ShuffleDescriptor
     new 0b2161a  [hotfix][tests] Make PartitionTestUtils enum singleton and fix codestyle
     new 9201cb8  [hotfix][tests] fix codestyle issues in ResultPartitionBuilder
     new 06468ef  [hotfix][tests] fix codestyle issues in ResultPartitionFactoryTest
     new 95d6d41  [hotfix][tests] fix codestyle issues in NettyShuffleEnvironmentBuilder
     new 0bbeedf  [hotfix][tests] fix codestyle issues in NettyShuffleDescriptorBuilder
     new c422c71  [hotfix][tests] fix codestyle issues in NettyShuffleEnvironmentConfiguration
     new 18c3686  [hotfix][network] Simplify ResultPartitionFactory.createSubpartitions based on ResultPartitionType.isBlocking
     new 039ec02  [hotfix][coordination] Check whether partition set to track is empty
     new b5ab84c  [FLINK-13371][coordination] Prevent leaks of blocking partitions

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/configuration/JobManagerOptions.java     |   5 -
 .../NettyShuffleEnvironmentOptions.java            |   5 +
 .../test-scripts/test_ha_dataset.sh                |   1 -
 .../ResultPartitionDeploymentDescriptor.java       |  57 ----
 .../flink/runtime/executiongraph/Execution.java    |  80 +++--
 .../runtime/executiongraph/ExecutionGraph.java     |  12 +-
 .../executiongraph/ExecutionGraphBuilder.java      |   4 -
 .../runtime/executiongraph/ExecutionVertex.java    |   4 +-
 .../io/network/NettyShuffleServiceFactory.java     |   3 +-
 .../io/network/partition/PartitionTracker.java     |   5 +
 .../io/network/partition/PartitionTrackerImpl.java |  11 +-
 .../network/partition/ResultPartitionFactory.java  |  84 +++---
 .../runtime/shuffle/NettyShuffleDescriptor.java    |  20 +-
 .../flink/runtime/shuffle/NettyShuffleMaster.java  |   6 +-
 .../flink/runtime/shuffle/ProducerDescriptor.java  |   4 +-
 .../flink/runtime/shuffle/ShuffleDescriptor.java   |  38 +--
 .../flink/runtime/shuffle/ShuffleEnvironment.java  |  26 +-
 .../flink/runtime/shuffle/ShuffleMaster.java       |   5 -
 .../runtime/shuffle/UnknownShuffleDescriptor.java  |   6 -
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   4 +-
 .../taskexecutor/partition/PartitionTable.java     |   4 +
 .../NettyShuffleEnvironmentConfiguration.java      |  23 +-
 .../ResultPartitionDeploymentDescriptorTest.java   |  15 +-
 .../ExecutionGraphDeploymentTest.java              |   2 +-
 .../ExecutionPartitionLifecycleTest.java           | 324 +++++++++++++++++++++
 .../runtime/executiongraph/ExecutionTest.java      | 127 --------
 .../io/network/NettyShuffleEnvironmentBuilder.java |  52 +---
 .../io/network/partition/NoOpPartitionTracker.java |   4 +
 .../io/network/partition/PartitionTestUtils.java   |  67 +----
 .../partition/PartitionTrackerImplTest.java        |  46 ++-
 .../network/partition/ResultPartitionBuilder.java  |   9 +-
 .../partition/ResultPartitionFactoryTest.java      |  44 ++-
 .../network/partition/TestingPartitionTracker.java |  10 +
 .../TaskExecutorPartitionLifecycleTest.java        |  52 +++-
 .../taskexecutor/partition/PartitionTableTest.java |   9 +
 .../util/NettyShuffleDescriptorBuilder.java        |  17 +-
 .../recovery/BatchFineGrainedRecoveryITCase.java   |   2 +-
 37 files changed, 646 insertions(+), 541 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java


[flink] 05/15: [hotfix][network] fix codestyle issues in NettyShuffleMaster

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18baca7089a04bcb1ee071e1247f512874bef16d
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:30:44 2019 +0300

    [hotfix][network] fix codestyle issues in NettyShuffleMaster
---
 .../main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index 6c2cb32..50c11cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.shuffle;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.PartitionConnectionInfo;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -51,7 +52,7 @@ public enum NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
 	public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
 	}
 
-	private static NettyShuffleDescriptor.PartitionConnectionInfo createConnectionInfo(
+	private static PartitionConnectionInfo createConnectionInfo(
 			ProducerDescriptor producerDescriptor,
 			int connectionIndex) {
 		return producerDescriptor.getDataPort() >= 0 ?


[flink] 12/15: [hotfix][tests] fix codestyle issues in NettyShuffleEnvironmentConfiguration

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c422c71446d06d31e4f9c134c49562c551d1ba5d
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Jul 31 10:52:00 2019 +0300

    [hotfix][tests] fix codestyle issues in NettyShuffleEnvironmentConfiguration
---
 .../flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 38817a6..36d3423 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -29,8 +29,8 @@ import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
-
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 


[flink] 06/15: [hotfix] fix codestyle issues in ShuffleDescriptor

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 355a80a7f4e79cc4b9400077afb12d2f4f941447
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:32:07 2019 +0300

    [hotfix] fix codestyle issues in ShuffleDescriptor
---
 .../main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
index 17feacb..8282630 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
@@ -41,14 +41,14 @@ public interface ShuffleDescriptor extends Serializable {
 	 * that the producer of the partition (consumer input channel) has not been scheduled
 	 * and its location and other relevant data is yet to be defined.
 	 * To proceed with the consumer deployment, currently unknown input channels have to be
-	 * marked with placeholders which are special implementation of {@link ShuffleDescriptor}:
+	 * marked with placeholders. The placeholder is a special implementation of the shuffle descriptor:
 	 * {@link UnknownShuffleDescriptor}.
 	 *
 	 * <p>Note: this method is not supposed to be overridden in concrete shuffle implementation.
 	 * The only class where it returns {@code true} is {@link UnknownShuffleDescriptor}.
 	 *
 	 * @return whether the partition producer has been ever deployed and
-	 * the corresponding {@link ShuffleDescriptor} is obtained from the {@link ShuffleMaster} implementation.
+	 * the corresponding shuffle descriptor is obtained from the {@link ShuffleMaster} implementation.
 	 */
 	default boolean isUnknown() {
 		return false;


[flink] 07/15: [hotfix][tests] Make PartitionTestUtils enum singleton and fix codestyle

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b2161a4cb8874c9ade1f32846e8867aa09db603
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:36:07 2019 +0300

    [hotfix][tests] Make PartitionTestUtils enum singleton and fix codestyle
---
 .../runtime/io/network/partition/PartitionTestUtils.java      | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index cf50051..72892d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -39,7 +40,8 @@ import static org.junit.Assert.fail;
  * While using Mockito internally (for now), the use of Mockito should not
  * leak out of this class.
  */
-public class PartitionTestUtils {
+public enum PartitionTestUtils {
+	;
 
 	public static ResultPartition createPartition() {
 		return createPartition(ResultPartitionType.PIPELINED_BOUNDED);
@@ -83,7 +85,7 @@ public class PartitionTestUtils {
 	}
 
 	static void verifyCreateSubpartitionViewThrowsException(
-			ResultPartitionManager partitionManager,
+			ResultPartitionProvider partitionManager,
 			ResultPartitionID partitionId) throws IOException {
 		try {
 			partitionManager.createSubpartitionView(partitionId, 0, new NoOpBufferAvailablityListener());
@@ -114,7 +116,10 @@ public class PartitionTestUtils {
 		return createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
 	}
 
-	public static void writeBuffers(ResultPartition partition, int numberOfBuffers, int bufferSize) throws IOException {
+	public static void writeBuffers(
+			ResultPartitionWriter partition,
+			int numberOfBuffers,
+			int bufferSize) throws IOException {
 		for (int i = 0; i < numberOfBuffers; i++) {
 			partition.addBufferConsumer(createFilledBufferConsumer(bufferSize, bufferSize), 0);
 		}


[flink] 01/15: [hotfix][tests] Remove setting the default value of force-release-on-consumption

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ddaeffacd706ea9fc075823bff3640e3b57ca0c2
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:14:49 2019 +0300

    [hotfix][tests] Remove setting the default value of force-release-on-consumption
---
 flink-end-to-end-tests/test-scripts/test_ha_dataset.sh | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
index b547142..db7667b 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
@@ -75,7 +75,6 @@ function setup_and_start_cluster() {
     create_ha_config
 
     set_config_key "jobmanager.execution.failover-strategy" "region"
-    set_config_key "jobmanager.scheduler.partition.force-release-on-consumption" "false"
     set_config_key "taskmanager.numberOfTaskSlots" "1"
 
     set_config_key "restart-strategy" "fixed-delay"


[flink] 08/15: [hotfix][tests] fix codestyle issues in ResultPartitionBuilder

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9201cb8369538654bde7da225faa881a17e75f10
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:39:52 2019 +0300

    [hotfix][tests] fix codestyle issues in ResultPartitionBuilder
---
 .../flink/runtime/io/network/partition/ResultPartitionBuilder.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 27eaab8..4bb1a85 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -113,7 +113,7 @@ public class ResultPartitionBuilder {
 		return this;
 	}
 
-	public ResultPartitionBuilder setNetworkBufferSize(int networkBufferSize) {
+	ResultPartitionBuilder setNetworkBufferSize(int networkBufferSize) {
 		this.networkBufferSize = networkBufferSize;
 		return this;
 	}
@@ -129,7 +129,8 @@ public class ResultPartitionBuilder {
 		return this;
 	}
 
-	public ResultPartitionBuilder setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType blockingSubpartitionType) {
+	ResultPartitionBuilder setBoundedBlockingSubpartitionType(
+			@SuppressWarnings("SameParameterValue") BoundedBlockingSubpartitionType blockingSubpartitionType) {
 		this.blockingSubpartitionType = blockingSubpartitionType;
 		return this;
 	}


[flink] 04/15: [hotfix][network] Annotate NettyShuffleDescriptor#PartitionConnectionInfo with @FunctionalInterface

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58e0ed2ab39d216495fb6dd06598a9717b3edc02
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:29:34 2019 +0300

    [hotfix][network] Annotate NettyShuffleDescriptor#PartitionConnectionInfo with @FunctionalInterface
---
 .../java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
index f758bcc..cf58b97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
@@ -70,6 +70,7 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
 	/**
 	 * Information for connection to partition producer for shuffle exchange.
 	 */
+	@FunctionalInterface
 	public interface PartitionConnectionInfo extends Serializable {
 		ConnectionID getConnectionId();
 	}


[flink] 15/15: [FLINK-13371][coordination] Prevent leaks of blocking partitions

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5ab84c6238ff1f69f2151ed580410ae4c63acd7
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Aug 1 14:27:28 2019 +0200

    [FLINK-13371][coordination] Prevent leaks of blocking partitions
---
 .../flink/runtime/executiongraph/Execution.java    |  71 +++--
 .../runtime/executiongraph/ExecutionGraph.java     |   2 +-
 .../runtime/executiongraph/ExecutionVertex.java    |   4 +-
 .../io/network/partition/PartitionTracker.java     |   5 +
 .../io/network/partition/PartitionTrackerImpl.java |   7 +
 .../flink/runtime/shuffle/ProducerDescriptor.java  |   4 +-
 .../ExecutionGraphDeploymentTest.java              |   2 +-
 .../ExecutionPartitionLifecycleTest.java           | 324 +++++++++++++++++++++
 .../runtime/executiongraph/ExecutionTest.java      | 127 --------
 .../io/network/partition/NoOpPartitionTracker.java |   4 +
 .../io/network/partition/PartitionTestUtils.java   |   2 +-
 .../partition/PartitionTrackerImplTest.java        |   7 +-
 .../network/partition/TestingPartitionTracker.java |  10 +
 .../TaskExecutorPartitionLifecycleTest.java        |  51 +++-
 14 files changed, 456 insertions(+), 164 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index af94188..d8a1b6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -56,6 +56,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ProducerDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
@@ -1013,7 +1014,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	void markFailed(Throwable t, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
-		processFail(t, true, userAccumulators, metrics);
+		// skip release of partitions since this is only called if the TM actually sent the FAILED state update
+		// in this case all partitions have already been cleaned up
+		processFail(t, true, userAccumulators, metrics, false);
 	}
 
 	@VisibleForTesting
@@ -1059,7 +1062,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			else if (current == CANCELING) {
 				// we sent a cancel call, and the task manager finished before it arrived. We
 				// will never get a CANCELED call back from the job manager
-				completeCancelling(userAccumulators, metrics);
+				completeCancelling(userAccumulators, metrics, true);
 				return;
 			}
 			else if (current == CANCELED || current == FAILED) {
@@ -1096,10 +1099,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	void completeCancelling() {
-		completeCancelling(null, null);
+		completeCancelling(null, null, true);
 	}
 
-	void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+	void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) {
 
 		// the taskmanagers can themselves cancel tasks without an external trigger, if they find that the
 		// network stack is canceled (for example by a failing / canceling receiver or sender
@@ -1117,7 +1120,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
 				if (transitionState(current, CANCELED)) {
-					finishCancellation();
+					finishCancellation(releasePartitions);
 					return;
 				}
 
@@ -1136,11 +1139,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	private void finishCancellation() {
+	private void finishCancellation(boolean releasePartitions) {
 		releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
 		vertex.getExecutionGraph().deregisterExecution(this);
-		// release partitions on TM in case the Task finished while we where already CANCELING
-		stopTrackingAndReleasePartitions();
+		handlePartitionCleanup(releasePartitions, releasePartitions);
 	}
 
 	void cachePartitionInfo(PartitionInfo partitionInfo) {
@@ -1160,10 +1162,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	// --------------------------------------------------------------------------------------------
 
 	private boolean processFail(Throwable t, boolean isCallback) {
-		return processFail(t, isCallback, null, null);
+		return processFail(t, isCallback, null, null, true);
 	}
 
-	private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+	private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) {
 		// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
 		// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
 
@@ -1187,7 +1189,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			}
 
 			if (current == CANCELING) {
-				completeCancelling(userAccumulators, metrics);
+				completeCancelling(userAccumulators, metrics, true);
 				return false;
 			}
 
@@ -1199,7 +1201,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 				releaseAssignedResource(t);
 				vertex.getExecutionGraph().deregisterExecution(this);
-				stopTrackingAndReleasePartitions();
+				handlePartitionCleanup(releasePartitions, releasePartitions);
 
 				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
 					if (LOG.isDebugEnabled()) {
@@ -1304,16 +1306,51 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	void stopTrackingAndReleasePartitions() {
+	void handlePartitionCleanup(boolean releasePipelinedPartitions, boolean releaseBlockingPartitions) {
+		if (releasePipelinedPartitions) {
+			sendReleaseIntermediateResultPartitionsRpcCall();
+		}
+
+		final Collection<ResultPartitionID> partitionIds = getPartitionIds();
+		final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker();
+
+		if (!partitionIds.isEmpty()) {
+			if (releaseBlockingPartitions) {
+				LOG.info("Discarding the results produced by task execution {}.", attemptId);
+				partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
+			} else {
+				partitionTracker.stopTrackingPartitions(partitionIds);
+			}
+		}
+	}
+
+	private Collection<ResultPartitionID> getPartitionIds() {
+		return producedPartitions.values().stream()
+			.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+			.map(ShuffleDescriptor::getResultPartitionID)
+			.collect(Collectors.toList());
+	}
+
+	private void sendReleaseIntermediateResultPartitionsRpcCall() {
 		LOG.info("Discarding the results produced by task execution {}.", attemptId);
-		if (producedPartitions != null && producedPartitions.size() > 0) {
-			final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker();
-			final List<ResultPartitionID> producedPartitionIds = producedPartitions.values().stream()
+		final LogicalSlot slot = assignedResource;
+
+		if (slot != null) {
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+			final ShuffleMaster<?> shuffleMaster = getVertex().getExecutionGraph().getShuffleMaster();
+
+			Collection<ResultPartitionID> partitionIds = producedPartitions.values().stream()
+				.filter(resultPartitionDeploymentDescriptor -> resultPartitionDeploymentDescriptor.getPartitionType().isPipelined())
 				.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+				.peek(shuffleMaster::releasePartitionExternally)
 				.map(ShuffleDescriptor::getResultPartitionID)
 				.collect(Collectors.toList());
 
-			partitionTracker.stopTrackingAndReleasePartitions(producedPartitionIds);
+			if (!partitionIds.isEmpty()) {
+				// TODO For some tests this could be a problem when querying too early if all resources were released
+				taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds);
+			}
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f779fd2..0984274 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1525,7 +1525,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			case CANCELED:
 				// this deserialization is exception-free
 				accumulators = deserializeAccumulators(state);
-				attempt.completeCancelling(accumulators, state.getIOMetrics());
+				attempt.completeCancelling(accumulators, state.getIOMetrics(), false);
 				return true;
 
 			case FAILED:
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 03c68f8..6d262ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -604,7 +604,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 			if (oldState.isTerminal()) {
 				if (oldState == FINISHED) {
-					oldExecution.stopTrackingAndReleasePartitions();
+					// pipelined partitions are released in Execution#cancel(), covering both job failures and vertex resets
+					// do not release pipelined partitions here to save RPC calls
+					oldExecution.handlePartitionCleanup(false, true);
 					getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
 				}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
index 3697fb8..268f4f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
@@ -46,6 +46,11 @@ public interface PartitionTracker {
 	void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds);
 
 	/**
+	 * Stops the tracking of the given partitions.
+	 */
+	void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds);
+
+	/**
 	 * Releases all partitions for the given task executor ID, and stop the tracking of partitions that were released.
 	 */
 	void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index f772b37..2e7f421 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -106,6 +106,13 @@ public class PartitionTrackerImpl implements PartitionTracker {
 	}
 
 	@Override
+	public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+		Preconditions.checkNotNull(resultPartitionIds);
+
+		resultPartitionIds.forEach(this::internalStopTrackingPartition);
+	}
+
+	@Override
 	public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
 		Preconditions.checkNotNull(producingTaskExecutorId);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
index ad3fc48..ec32178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
@@ -63,11 +63,11 @@ public class ProducerDescriptor {
 		this.dataPort = dataPort;
 	}
 
-	ResourceID getProducerLocation() {
+	public ResourceID getProducerLocation() {
 		return producerLocation;
 	}
 
-	ExecutionAttemptID getProducerExecutionId() {
+	public ExecutionAttemptID getProducerExecutionId() {
 		return producerExecutionId;
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d0cfa00..d10fcbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -382,7 +382,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		Execution execution1 = executions.values().iterator().next();
 		execution1.cancel();
-		execution1.completeCancelling(accumulators, ioMetrics);
+		execution1.completeCancelling(accumulators, ioMetrics, false);
 
 		assertEquals(ioMetrics, execution1.getIOMetrics());
 		assertEquals(accumulators, execution1.getUserAccumulators());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
new file mode 100644
index 0000000..ab4048d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link Execution}.
+ */
+public class ExecutionPartitionLifecycleTest extends TestLogger {
+
+	@ClassRule
+	public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE =
+		new TestingComponentMainThreadExecutor.Resource();
+
+	private Execution execution;
+	private ResultPartitionDeploymentDescriptor descriptor;
+	private ResourceID taskExecutorResourceId;
+	private JobID jobId;
+
+	@Test
+	public void testPartitionReleaseOnFinishWhileCanceling() throws Exception {
+		final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+		final CompletableFuture<Tuple2<JobID, Collection<ResultPartitionID>>> releasePartitionsCallFuture = new CompletableFuture<>();
+		taskManagerGateway.setReleasePartitionsConsumer(((jobID, partitionIds) -> releasePartitionsCallFuture.complete(Tuple2.of(jobID, partitionIds))));
+
+		final TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
+
+		setupExecutionGraphAndStartRunningJob(ResultPartitionType.PIPELINED, NoOpPartitionTracker.INSTANCE, taskManagerGateway, testingShuffleMaster);
+
+		execution.cancel();
+		assertFalse(releasePartitionsCallFuture.isDone());
+
+		execution.markFinished();
+		assertTrue(releasePartitionsCallFuture.isDone());
+
+		final Tuple2<JobID, Collection<ResultPartitionID>> releasePartitionsCall = releasePartitionsCallFuture.get();
+		assertEquals(jobId, releasePartitionsCall.f0);
+		assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), releasePartitionsCall.f1);
+
+		assertEquals(1, testingShuffleMaster.externallyReleasedPartitions.size());
+		assertEquals(descriptor.getShuffleDescriptor(), testingShuffleMaster.externallyReleasedPartitions.poll());
+	}
+
+	private enum PartitionReleaseResult {
+		NONE,
+		STOP_TRACKING,
+		STOP_TRACKING_AND_RELEASE
+	}
+
+	@Test
+	public void testPartitionTrackedAndNotReleasedWhenFinished() throws Exception {
+		testPartitionTrackingForStateTransition(Execution::markFinished, PartitionReleaseResult.NONE);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndNotReleasedWhenCanceledByTM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> {
+				execution.cancel();
+				execution.completeCancelling(Collections.emptyMap(), new IOMetrics(0, 0, 0, 0), false);
+			},
+			PartitionReleaseResult.STOP_TRACKING);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndReleasedWhenCanceledByJM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> {
+				execution.cancel();
+				execution.completeCancelling();
+			},
+			PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndNotReleasedWhenFailedByTM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> execution.markFailed(
+				new Exception("Test exception"),
+				Collections.emptyMap(),
+				new IOMetrics(0, 0, 0, 0)),
+			PartitionReleaseResult.STOP_TRACKING);
+	}
+
+	@Test
+	public void testPartitionNotTrackedAndReleasedWhenFailedByJM() throws Exception {
+		testPartitionTrackingForStateTransition(
+			execution -> execution.markFailed(new Exception("Test exception")),
+			PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
+	}
+
+	private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final PartitionReleaseResult partitionReleaseResult) throws Exception {
+		CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>();
+		CompletableFuture<Collection<ResultPartitionID>> partitionStopTrackingFuture = new CompletableFuture<>();
+		CompletableFuture<Collection<ResultPartitionID>> partitionStopTrackingAndReleaseFuture = new CompletableFuture<>();
+		final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+		partitionTracker.setStartTrackingPartitionsConsumer(
+			(resourceID, resultPartitionDeploymentDescriptor) ->
+				partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor))
+		);
+		partitionTracker.setStopTrackingPartitionsConsumer(partitionStopTrackingFuture::complete);
+		partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionStopTrackingAndReleaseFuture::complete);
+
+		setupExecutionGraphAndStartRunningJob(ResultPartitionType.BLOCKING, partitionTracker, new SimpleAckingTaskManagerGateway(), NettyShuffleMaster.INSTANCE);
+
+		Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get();
+		assertThat(startTrackingCall.f0, equalTo(taskExecutorResourceId));
+		assertThat(startTrackingCall.f1, equalTo(descriptor));
+
+		stateTransition.accept(execution);
+
+		switch (partitionReleaseResult) {
+			case NONE:
+				assertFalse(partitionStopTrackingFuture.isDone());
+				assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+				break;
+			case STOP_TRACKING:
+				assertTrue(partitionStopTrackingFuture.isDone());
+				assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+				final Collection<ResultPartitionID> stopTrackingCall = partitionStopTrackingFuture.get();
+				assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), stopTrackingCall);
+				break;
+			case STOP_TRACKING_AND_RELEASE:
+				assertFalse(partitionStopTrackingFuture.isDone());
+				assertTrue(partitionStopTrackingAndReleaseFuture.isDone());
+				final Collection<ResultPartitionID> stopTrackingAndReleaseCall = partitionStopTrackingAndReleaseFuture.get();
+				assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), stopTrackingAndReleaseCall);
+				break;
+		}
+	}
+
+	private void setupExecutionGraphAndStartRunningJob(ResultPartitionType resultPartitionType, PartitionTracker partitionTracker, TaskManagerGateway taskManagerGateway, ShuffleMaster<?> shuffleMaster) throws JobException, JobExecutionException {
+		final JobVertex producerVertex = createNoOpJobVertex();
+		final JobVertex consumerVertex = createNoOpJobVertex();
+		consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final SlotProvider slotProvider = new SlotProvider() {
+			@Override
+			public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
+				return CompletableFuture.completedFuture(new SimpleSlot(
+					new SingleSlotTestingSlotOwner(),
+					taskManagerLocation,
+					0,
+					taskManagerGateway));
+			}
+
+			@Override
+			public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
+			}
+		};
+
+		final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(
+			null,
+			new JobGraph(new JobID(), "test job", producerVertex, consumerVertex),
+			new Configuration(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			slotProvider,
+			ExecutionPartitionLifecycleTest.class.getClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(10),
+			new NoRestartStrategy(),
+			new UnregisteredMetricsGroup(),
+			VoidBlobWriter.getInstance(),
+			Time.seconds(10),
+			log,
+			shuffleMaster,
+			partitionTracker);
+
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+		final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
+		final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+		execution = executionVertex.getCurrentExecutionAttempt();
+
+		execution.allocateResourcesForExecution(
+			executionGraph.getSlotProviderStrategy(),
+			LocationPreferenceConstraint.ALL,
+			Collections.emptySet());
+
+		execution.deploy();
+		execution.switchToRunning();
+
+		final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex
+			.getProducedDataSets()[0]
+			.getPartitions()[0]
+			.getPartitionId();
+
+		descriptor = execution
+			.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
+		taskExecutorResourceId = taskManagerLocation.getResourceID();
+		jobId = executionGraph.getJobID();
+	}
+
+	@Nonnull
+	private JobVertex createNoOpJobVertex() {
+		final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		return jobVertex;
+	}
+
+	/**
+	 * Slot owner which records the first returned slot.
+	 */
+	private static final class SingleSlotTestingSlotOwner implements SlotOwner {
+
+		final CompletableFuture<LogicalSlot> returnedSlot = new CompletableFuture<>();
+
+		@Override
+		public void returnLogicalSlot(LogicalSlot logicalSlot) {
+			returnedSlot.complete(logicalSlot);
+		}
+	}
+
+	private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
+
+		final Queue<ShuffleDescriptor> externallyReleasedPartitions = new ArrayBlockingQueue<>(4);
+
+		@Override
+		public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
+			return CompletableFuture.completedFuture(new ShuffleDescriptor() {
+				@Override
+				public ResultPartitionID getResultPartitionID() {
+					return new ResultPartitionID(
+						partitionDescriptor.getPartitionId(),
+						producerDescriptor.getProducerExecutionId());
+				}
+
+				@Override
+				public Optional<ResourceID> storesLocalResourcesOn() {
+					return Optional.of(producerDescriptor.getProducerLocation());
+				}
+			});
+		}
+
+		@Override
+		public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
+			externallyReleasedPartitions.add(shuffleDescriptor);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index b8c83fe..433e3f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,33 +19,16 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
@@ -53,10 +36,8 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -65,7 +46,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -74,9 +54,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
 
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -526,111 +504,6 @@ public class ExecutionTest extends TestLogger {
 		assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get())));
 	}
 
-	@Test
-	public void testPartitionRetainedWhenFinished() throws Exception {
-		testPartitionTrackingForStateTransition(Execution::markFinished, false);
-	}
-
-	@Test
-	public void testPartitionReleasedWhenCanceled() throws Exception {
-		testPartitionTrackingForStateTransition(
-			execution -> {
-				execution.cancel();
-				execution.completeCancelling();
-			},
-			true);
-	}
-
-	@Test
-	public void testPartitionReleasedWhenFailed() throws Exception {
-		testPartitionTrackingForStateTransition(execution -> execution.fail(new Exception("Test exception")), true);
-	}
-
-	private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final boolean shouldPartitionBeReleased) throws Exception {
-		final JobVertex producerVertex = createNoOpJobVertex();
-		final JobVertex consumerVertex = createNoOpJobVertex();
-		consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
-
-		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
-
-		final SlotProvider slotProvider = new SlotProvider() {
-			@Override
-			public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
-				return CompletableFuture.completedFuture(new SimpleSlot(
-					new SingleSlotTestingSlotOwner(),
-					taskManagerLocation,
-					0,
-					new SimpleAckingTaskManagerGateway()));
-			}
-
-			@Override
-			public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
-			}
-		};
-
-		CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>();
-		CompletableFuture<Collection<ResultPartitionID>> partitionReleaseFuture = new CompletableFuture<>();
-		final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
-		partitionTracker.setStartTrackingPartitionsConsumer(
-			(resourceID, resultPartitionDeploymentDescriptor) ->
-				partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor))
-		);
-		partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionReleaseFuture::complete);
-
-		final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(
-			null,
-			new JobGraph(new JobID(), "test job", producerVertex, consumerVertex),
-			new Configuration(),
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			slotProvider,
-			ExecutionTest.class.getClassLoader(),
-			new StandaloneCheckpointRecoveryFactory(),
-			Time.seconds(10),
-			new NoRestartStrategy(),
-			new UnregisteredMetricsGroup(),
-			VoidBlobWriter.getInstance(),
-			Time.seconds(10),
-			log,
-			NettyShuffleMaster.INSTANCE,
-			partitionTracker);
-
-		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
-		final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
-		final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
-
-		final Execution execution = executionVertex.getCurrentExecutionAttempt();
-
-		execution.allocateResourcesForExecution(
-			executionGraph.getSlotProviderStrategy(),
-			LocationPreferenceConstraint.ALL,
-			Collections.emptySet());
-
-		assertThat(partitionStartTrackingFuture.isDone(), is(true));
-		final Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get();
-
-		final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex
-			.getProducedDataSets()[0]
-			.getPartitions()[0]
-			.getPartitionId();
-		final ResultPartitionDeploymentDescriptor descriptor = execution
-			.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
-		assertThat(startTrackingCall.f0, equalTo(taskManagerLocation.getResourceID()));
-		assertThat(startTrackingCall.f1, equalTo(descriptor));
-
-		execution.deploy();
-		execution.switchToRunning();
-
-		stateTransition.accept(execution);
-
-		assertThat(partitionReleaseFuture.isDone(), is(shouldPartitionBeReleased));
-		if (shouldPartitionBeReleased) {
-			final Collection<ResultPartitionID> partitionReleaseCall = partitionReleaseFuture.get();
-			assertThat(partitionReleaseCall, contains(descriptor.getShuffleDescriptor().getResultPartitionID()));
-		}
-	}
-
 	/**
 	 * Tests that a slot release will atomically release the assigned {@link Execution}.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
index 0d1a160..2888b31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
@@ -43,6 +43,10 @@ public enum NoOpPartitionTracker implements PartitionTracker {
 	}
 
 	@Override
+	public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+	}
+
+	@Override
 	public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 72892d6..b1a58a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -96,7 +96,7 @@ public enum PartitionTestUtils {
 		}
 	}
 
-	static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(
+	public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(
 		ResultPartitionType partitionType) {
 		ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
 		PartitionDescriptor partitionDescriptor = new PartitionDescriptor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
index 5ca7156..63e4f44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
@@ -50,12 +50,12 @@ import static org.junit.Assert.assertEquals;
 public class PartitionTrackerImplTest extends TestLogger {
 
 	@Test
-	public void testReleasedOnConsumptionPartitionIsNotTracked() {
+	public void testPipelinedPartitionIsNotTracked() {
 		testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
 	}
 
 	@Test
-	public void testRetainedOnConsumptionPartitionIsTracked() {
+	public void testBlockingPartitionIsTracked() {
 		testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
 	}
 
@@ -75,8 +75,7 @@ public class PartitionTrackerImplTest extends TestLogger {
 				resultPartitionType,
 				false));
 
-		final boolean isTrackingExpected = resultPartitionType == ResultPartitionType.BLOCKING;
-		assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(isTrackingExpected));
+		assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(resultPartitionType.isBlocking()));
 	}
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
index 2d85ff6..6ba333d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
@@ -36,6 +36,7 @@ public class TestingPartitionTracker implements PartitionTracker {
 	private Consumer<ResourceID> stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {};
 	private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {};
 	private Consumer<Collection<ResultPartitionID>> stopTrackingAndReleasePartitionsConsumer = ignored -> {};
+	private Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer = ignored -> {};
 
 	public void setStartTrackingPartitionsConsumer(BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer) {
 		this.startTrackingPartitionsConsumer = startTrackingPartitionsConsumer;
@@ -61,6 +62,10 @@ public class TestingPartitionTracker implements PartitionTracker {
 		this.stopTrackingAndReleasePartitionsConsumer = stopTrackingAndReleasePartitionsConsumer;
 	}
 
+	public void setStopTrackingPartitionsConsumer(Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer) {
+		this.stopTrackingPartitionsConsumer = stopTrackingPartitionsConsumer;
+	}
+
 	@Override
 	public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
 		this.startTrackingPartitionsConsumer.accept(producingTaskExecutorId, resultPartitionDeploymentDescriptor);
@@ -77,6 +82,11 @@ public class TestingPartitionTracker implements PartitionTracker {
 	}
 
 	@Override
+	public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+		stopTrackingPartitionsConsumer.accept(resultPartitionIds);
+	}
+
+	@Override
 	public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
 		stopTrackingAndReleaseAllPartitionsConsumer.accept(producingTaskExecutorId);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index d87cf84..6b6b9e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvi
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -90,6 +91,7 @@ import java.util.stream.StreamSupport;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -190,33 +192,62 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	}
 
 	@Test
-	public void testPartitionReleaseAfterDisconnect() throws Exception {
+	public void testBlockingPartitionReleaseAfterDisconnect() throws Exception {
 		testPartitionRelease(
 			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")),
-			true);
+			true,
+			ResultPartitionType.BLOCKING);
 	}
 
 	@Test
-	public void testPartitionReleaseAfterReleaseCall() throws Exception {
+	public void testPipelinedPartitionNotReleasedAfterDisconnect() throws Exception {
+		testPartitionRelease(
+			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")),
+			false,
+			ResultPartitionType.PIPELINED);
+	}
+
+	@Test
+	public void testBlockingPartitionReleaseAfterReleaseCall() throws Exception {
 		testPartitionRelease(
 			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)),
-			true);
+			true,
+			ResultPartitionType.BLOCKING);
 	}
 
 	@Test
-	public void testPartitionReleaseAfterShutdown() throws Exception {
+	public void testPipelinedPartitionReleaseAfterReleaseCall() throws Exception {
+		testPartitionRelease(
+			(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)),
+			true,
+			ResultPartitionType.PIPELINED);
+	}
+
+	@Test
+	public void testBlockingPartitionReleaseAfterShutdown() throws Exception {
 		// don't do any explicit release action, so that the partition must be cleaned up on shutdown
 		testPartitionRelease(
 			(jobId, partitionId, taskExecutorGateway) -> { },
-			false);
+			false,
+			ResultPartitionType.BLOCKING);
+	}
+
+	@Test
+	public void testPipelinedPartitionReleaseAfterShutdown() throws Exception {
+		// don't do any explicit release action, so that the partition must be cleaned up on shutdown
+		testPartitionRelease(
+			(jobId, partitionId, taskExecutorGateway) -> { },
+			false,
+			ResultPartitionType.PIPELINED);
 	}
 
 	private void testPartitionRelease(
 		TriConsumer<JobID, ResultPartitionID, TaskExecutorGateway> releaseAction,
-		boolean waitForRelease) throws Exception {
+		boolean waitForRelease,
+		ResultPartitionType resultPartitionType) throws Exception {
 
 		final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
-			PartitionTestUtils.createPartitionDeploymentDescriptor();
+			PartitionTestUtils.createPartitionDeploymentDescriptor(resultPartitionType);
 		final ExecutionAttemptID eid1 = taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
 
 		final TaskDeploymentDescriptor taskDeploymentDescriptor =
@@ -343,7 +374,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			// the task is still running => the partition is in in-progress
 			runInTaskExecutorThreadAndWait(
 				taskExecutor,
-				() -> assertTrue(partitionTable.hasTrackedPartitions(jobId)));
+				() -> assertThat(partitionTable.hasTrackedPartitions(jobId), is(resultPartitionType.isBlocking())));
 
 			TestingInvokable.sync.releaseBlocker();
 			taskFinishedFuture.get(timeout.getSize(), timeout.getUnit());
@@ -351,7 +382,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			// the task is finished => the partition should be finished now
 			runInTaskExecutorThreadAndWait(
 				taskExecutor,
-				() -> assertTrue(partitionTable.hasTrackedPartitions(jobId)));
+				() -> assertThat(partitionTable.hasTrackedPartitions(jobId), is(resultPartitionType.isBlocking())));
 
 			final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
 			runInTaskExecutorThreadAndWait(


[flink] 11/15: [hotfix][tests] fix codestyle issues in NettyShuffleDescriptorBuilder

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0bbeedf4b97eb69adb76449e92cd86e5fe42b989
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:49:24 2019 +0300

    [hotfix][tests] fix codestyle issues in NettyShuffleDescriptorBuilder
---
 .../org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
index 2d58d03..052fefb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
@@ -38,8 +38,8 @@ public class NettyShuffleDescriptorBuilder {
 	private ResourceID producerLocation = ResourceID.generate();
 	private ResultPartitionID id = new ResultPartitionID();
 	private InetAddress address = InetAddress.getLoopbackAddress();
-	private int dataPort = 0;
-	private int connectionIndex = 0;
+	private int dataPort;
+	private int connectionIndex;
 	public NettyShuffleDescriptorBuilder setProducerLocation(ResourceID producerLocation) {
 		this.producerLocation = producerLocation;
 		return this;


[flink] 10/15: [hotfix][tests] fix codestyle issues in NettyShuffleEnvironmentBuilder

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95d6d41c710ffbfcc200e3193b840fddf515cb79
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:48:28 2019 +0300

    [hotfix][tests] fix codestyle issues in NettyShuffleEnvironmentBuilder
---
 .../io/network/NettyShuffleEnvironmentBuilder.java | 49 +++++-----------------
 1 file changed, 11 insertions(+), 38 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 96b6330..4f0ecf3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -33,38 +33,30 @@ import java.time.Duration;
  */
 public class NettyShuffleEnvironmentBuilder {
 
-	public static final int DEFAULT_NUM_NETWORK_BUFFERS = 1024;
+	private static final int DEFAULT_NETWORK_BUFFER_SIZE = 32 << 10;
+	private static final int DEFAULT_NUM_NETWORK_BUFFERS = 1024;
 
-	private static final String[] DEFAULT_TEMP_DIRS = new String[] {EnvironmentInformation.getTemporaryFileDirectory()};
+	private static final String[] DEFAULT_TEMP_DIRS = {EnvironmentInformation.getTemporaryFileDirectory()};
+	private static final Duration DEFAULT_REQUEST_SEGMENTS_TIMEOUT = Duration.ofMillis(30000L);
 
 	private int numNetworkBuffers = DEFAULT_NUM_NETWORK_BUFFERS;
 
-	private int networkBufferSize = 32 * 1024;
+	private int partitionRequestInitialBackoff;
 
-	private int partitionRequestInitialBackoff = 0;
-
-	private int partitionRequestMaxBackoff = 0;
+	private int partitionRequestMaxBackoff;
 
 	private int networkBuffersPerChannel = 2;
 
 	private int floatingNetworkBuffersPerGate = 8;
 
-	private Duration requestSegmentsTimeout = Duration.ofMillis(30000L);
-
 	private boolean isCreditBased = true;
 
-	private boolean isNetworkDetailedMetrics = false;
-
 	private ResourceID taskManagerLocation = ResourceID.generate();
 
 	private NettyConfig nettyConfig;
 
-	private TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
-
 	private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
-	private String[] tempDirs = DEFAULT_TEMP_DIRS;
-
 	public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) {
 		this.taskManagerLocation = taskManagerLocation;
 		return this;
@@ -75,11 +67,6 @@ public class NettyShuffleEnvironmentBuilder {
 		return this;
 	}
 
-	public NettyShuffleEnvironmentBuilder setNetworkBufferSize(int networkBufferSize) {
-		this.networkBufferSize = networkBufferSize;
-		return this;
-	}
-
 	public NettyShuffleEnvironmentBuilder setPartitionRequestInitialBackoff(int partitionRequestInitialBackoff) {
 		this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
 		return this;
@@ -100,10 +87,6 @@ public class NettyShuffleEnvironmentBuilder {
 		return this;
 	}
 
-	public void setRequestSegmentsTimeout(Duration requestSegmentsTimeout) {
-		this.requestSegmentsTimeout = requestSegmentsTimeout;
-	}
-
 	public NettyShuffleEnvironmentBuilder setIsCreditBased(boolean isCreditBased) {
 		this.isCreditBased = isCreditBased;
 		return this;
@@ -114,39 +97,29 @@ public class NettyShuffleEnvironmentBuilder {
 		return this;
 	}
 
-	public NettyShuffleEnvironmentBuilder setTaskEventDispatcher(TaskEventDispatcher taskEventDispatcher) {
-		this.taskEventDispatcher = taskEventDispatcher;
-		return this;
-	}
-
 	public NettyShuffleEnvironmentBuilder setMetricGroup(MetricGroup metricGroup) {
 		this.metricGroup = metricGroup;
 		return this;
 	}
 
-	public NettyShuffleEnvironmentBuilder setTempDirs(String[] tempDirs) {
-		this.tempDirs = tempDirs;
-		return this;
-	}
-
 	public NettyShuffleEnvironment build() {
 		return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
 			new NettyShuffleEnvironmentConfiguration(
 				numNetworkBuffers,
-				networkBufferSize,
+				DEFAULT_NETWORK_BUFFER_SIZE,
 				partitionRequestInitialBackoff,
 				partitionRequestMaxBackoff,
 				networkBuffersPerChannel,
 				floatingNetworkBuffersPerGate,
-				requestSegmentsTimeout,
+				DEFAULT_REQUEST_SEGMENTS_TIMEOUT,
 				isCreditBased,
-				isNetworkDetailedMetrics,
+				false,
 				nettyConfig,
-				tempDirs,
+				DEFAULT_TEMP_DIRS,
 				BoundedBlockingSubpartitionType.AUTO,
 				false),
 			taskManagerLocation,
-			taskEventDispatcher,
+			new TaskEventDispatcher(),
 			metricGroup);
 	}
 }


[flink] 09/15: [hotfix][tests] fix codestyle issues in ResultPartitionFactoryTest

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 06468ef3af4d2dd7e0ff96df96be278695d532ae
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:43:41 2019 +0300

    [hotfix][tests] fix codestyle issues in ResultPartitionFactoryTest
---
 .../runtime/io/network/partition/ResultPartitionFactoryTest.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 8065829..1c8591f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -39,9 +39,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
 /**
  * Tests for the {@link ResultPartitionFactory}.
  */
+@SuppressWarnings("StaticVariableUsedBeforeInitialization")
 public class ResultPartitionFactoryTest extends TestLogger {
 
 	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+	private static final int SEGMENT_SIZE = 64;
 
 	private static FileChannelManager fileChannelManager;
 
@@ -79,11 +81,11 @@ public class ResultPartitionFactoryTest extends TestLogger {
 		ResultPartitionFactory factory = new ResultPartitionFactory(
 			new ResultPartitionManager(),
 			fileChannelManager,
-			new NetworkBufferPool(1, 64, 1),
+			new NetworkBufferPool(1, SEGMENT_SIZE, 1),
 			BoundedBlockingSubpartitionType.AUTO,
 			1,
 			1,
-			64,
+			SEGMENT_SIZE,
 			releasePartitionOnConsumption);
 
 		final ResultPartitionDeploymentDescriptor descriptor = new ResultPartitionDeploymentDescriptor(


[flink] 14/15: [hotfix][coordination] Check whether partition set to track is empty

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 039ec028809863712509b785aa5e2aff19a5018b
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jul 26 12:49:45 2019 +0200

    [hotfix][coordination] Check whether partition set to track is empty
---
 .../flink/runtime/taskexecutor/partition/PartitionTable.java     | 4 ++++
 .../flink/runtime/taskexecutor/partition/PartitionTableTest.java | 9 +++++++++
 2 files changed, 13 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
index 02942cf..d214e09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
@@ -51,6 +51,10 @@ public class PartitionTable<K> {
 		Preconditions.checkNotNull(key);
 		Preconditions.checkNotNull(newPartitionIds);
 
+		if (newPartitionIds.isEmpty()) {
+			return;
+		}
+
 		trackedPartitionsPerJob.compute(key, (ignored, partitionIds) -> {
 			if (partitionIds == null) {
 				partitionIds = new HashSet<>(8);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
index 4e54af1..e2f63fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
@@ -63,6 +63,15 @@ public class PartitionTableTest extends TestLogger {
 	}
 
 	@Test
+	public void testStartTrackingZeroPartitionDoesNotMutateState() {
+		final PartitionTable<JobID> table = new PartitionTable<>();
+
+		table.startTrackingPartitions(JOB_ID, Collections.emptyList());
+
+		assertFalse(table.hasTrackedPartitions(JOB_ID));
+	}
+
+	@Test
 	public void testStopTrackingAllPartitions() {
 		final PartitionTable<JobID> table = new PartitionTable<>();
 


[flink] 02/15: [FLINK-13435] Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4d7bff09a3d4c1816dc235d616753095f626243
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri Jul 26 12:13:23 2019 +0200

    [FLINK-13435] Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type
    
    In a long term we do not need auto-release semantics for blocking (persistent) partition. We expect them always to be released externally by JM and assume they can be consumed multiple times.
    
    The pipelined partitions have always only one consumer and one consumption attempt. Afterwards they can be always released automatically.
    
    ShuffleDescriptor.ReleaseType was introduced to make release semantics more flexible but it is not needed in a long term.
    
    FORCE_PARTITION_RELEASE_ON_CONSUMPTION was introduced as a safety net to be able to fallback to 1.8 behaviour without the partition tracker and JM taking care about blocking partition release. We can make this option specific for NettyShuffleEnvironment which was the only existing shuffle service before. If it is activated then the blocking partition is also auto-released on a consumption attempt as it was before. The fine-grained recovery will just not find the partition after the jo [...]
---
 .../flink/configuration/JobManagerOptions.java     |  5 --
 .../NettyShuffleEnvironmentOptions.java            |  5 ++
 .../ResultPartitionDeploymentDescriptor.java       | 57 ----------------------
 .../flink/runtime/executiongraph/Execution.java    |  9 +---
 .../runtime/executiongraph/ExecutionGraph.java     | 10 ----
 .../executiongraph/ExecutionGraphBuilder.java      |  4 --
 .../io/network/NettyShuffleServiceFactory.java     |  3 +-
 .../io/network/partition/PartitionTrackerImpl.java |  4 +-
 .../network/partition/ResultPartitionFactory.java  | 10 ++--
 .../runtime/shuffle/NettyShuffleDescriptor.java    | 19 +-------
 .../flink/runtime/shuffle/NettyShuffleMaster.java  |  3 +-
 .../flink/runtime/shuffle/ShuffleDescriptor.java   | 34 +------------
 .../flink/runtime/shuffle/ShuffleEnvironment.java  | 26 +++++-----
 .../flink/runtime/shuffle/ShuffleMaster.java       |  5 --
 .../runtime/shuffle/UnknownShuffleDescriptor.java  |  6 ---
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  4 +-
 .../NettyShuffleEnvironmentConfiguration.java      | 21 ++++++--
 .../ResultPartitionDeploymentDescriptorTest.java   | 15 +-----
 .../io/network/NettyShuffleEnvironmentBuilder.java |  3 +-
 .../io/network/partition/PartitionTestUtils.java   | 56 ++-------------------
 .../partition/PartitionTrackerImplTest.java        | 43 ++++++++--------
 .../network/partition/ResultPartitionBuilder.java  |  4 +-
 .../partition/ResultPartitionFactoryTest.java      | 26 ++++++----
 .../TaskExecutorPartitionLifecycleTest.java        |  3 +-
 .../util/NettyShuffleDescriptorBuilder.java        | 13 +----
 .../recovery/BatchFineGrainedRecoveryITCase.java   |  2 +-
 26 files changed, 103 insertions(+), 287 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index e062829..3643667 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -195,11 +195,6 @@ public class JobManagerOptions {
 			.defaultValue(true)
 			.withDescription("Controls whether partitions should already be released during the job execution.");
 
-	@Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
-	public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
-			key("jobmanager.scheduler.partition.force-release-on-consumption")
-			.defaultValue(false);
-
 	// ---------------------------------------------------------------------------------------------
 
 	private JobManagerOptions() {
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 4ba4c8e..733085e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -231,6 +231,11 @@ public class NettyShuffleEnvironmentOptions {
 
 	// ------------------------------------------------------------------------
 
+	@Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
+	public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
+		key("taskmanager.network.partition.force-release-on-consumption")
+			.defaultValue(false);
+
 	/** Not intended to be instantiated. */
 	private NettyShuffleEnvironmentOptions() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 39b61d2..064c9bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -18,22 +18,16 @@
 
 package org.apache.flink.runtime.deployment;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 
 import java.io.Serializable;
-import java.util.Collection;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -54,48 +48,16 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 	/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
 	private final boolean sendScheduleOrUpdateConsumersMessage;
 
-	private final ReleaseType releaseType;
-
-	@VisibleForTesting
 	public ResultPartitionDeploymentDescriptor(
 			PartitionDescriptor partitionDescriptor,
 			ShuffleDescriptor shuffleDescriptor,
 			int maxParallelism,
 			boolean sendScheduleOrUpdateConsumersMessage) {
-		this(
-			checkNotNull(partitionDescriptor),
-			shuffleDescriptor,
-			maxParallelism,
-			sendScheduleOrUpdateConsumersMessage,
-			ReleaseType.AUTO);
-	}
-
-	public ResultPartitionDeploymentDescriptor(
-			PartitionDescriptor partitionDescriptor,
-			ShuffleDescriptor shuffleDescriptor,
-			int maxParallelism,
-			boolean sendScheduleOrUpdateConsumersMessage,
-			ReleaseType releaseType) {
-		checkReleaseOnConsumptionIsSupportedForPartition(shuffleDescriptor, releaseType);
 		this.partitionDescriptor = checkNotNull(partitionDescriptor);
 		this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
 		KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
 		this.maxParallelism = maxParallelism;
 		this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
-		this.releaseType = releaseType;
-	}
-
-	private static void checkReleaseOnConsumptionIsSupportedForPartition(
-			ShuffleDescriptor shuffleDescriptor,
-			ReleaseType releaseType) {
-		checkNotNull(shuffleDescriptor);
-		checkArgument(
-			shuffleDescriptor.getSupportedReleaseTypes().contains(releaseType),
-			"Release type %s is not supported by the shuffle service for this partition" +
-				"(id: %s), supported release types: %s",
-			releaseType,
-			shuffleDescriptor.getResultPartitionID(),
-			shuffleDescriptor.getSupportedReleaseTypes());
 	}
 
 	public IntermediateDataSetID getResultId() {
@@ -126,25 +88,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 		return sendScheduleOrUpdateConsumersMessage;
 	}
 
-	/**
-	 * Returns whether to release the partition after having been fully consumed once.
-	 *
-	 * <p>Indicates whether the shuffle service should automatically release all partition resources after
-	 * the first full consumption has been acknowledged. This kind of partition does not need to be explicitly released
-	 * by {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
-	 * and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
-	 *
-	 * <p>The partition has to support the corresponding {@link ReleaseType} in
-	 * {@link ShuffleDescriptor#getSupportedReleaseTypes()}:
-	 * {@link ReleaseType#AUTO} for {@code isReleasedOnConsumption()} to return {@code true} and
-	 * {@link ReleaseType#MANUAL} for {@code isReleasedOnConsumption()} to return {@code false}.
-	 *
-	 * @return whether to release the partition after having been fully consumed once.
-	 */
-	public boolean isReleasedOnConsumption() {
-		return releaseType == ReleaseType.AUTO;
-	}
-
 	@Override
 	public String toString() {
 		return String.format("ResultPartitionDeploymentDescriptor [PartitionDescriptor: %s, "
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 6679fe1..af94188 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -631,19 +631,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				.getShuffleMaster()
 				.registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
 
-			final boolean releasePartitionOnConsumption =
-				vertex.getExecutionGraph().isForcePartitionReleaseOnConsumption()
-				|| !partitionDescriptor.getPartitionType().isBlocking();
-
 			CompletableFuture<ResultPartitionDeploymentDescriptor> partitionRegistration = shuffleDescriptorFuture
 				.thenApply(shuffleDescriptor -> new ResultPartitionDeploymentDescriptor(
 					partitionDescriptor,
 					shuffleDescriptor,
 					maxParallelism,
-					lazyScheduling,
-					releasePartitionOnConsumption
-						? ShuffleDescriptor.ReleaseType.AUTO
-						: ShuffleDescriptor.ReleaseType.MANUAL));
+					lazyScheduling));
 			partitionRegistrations.add(partitionRegistration);
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cc51042..f779fd2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -320,8 +320,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	/** Shuffle master to register partitions for task deployment. */
 	private final ShuffleMaster<?> shuffleMaster;
 
-	private boolean forcePartitionReleaseOnConsumption;
-
 	// --------------------------------------------------------------------------------------------
 	//   Constructors
 	// --------------------------------------------------------------------------------------------
@@ -425,7 +423,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			allocationTimeout,
 			new NotReleasingPartitionReleaseStrategy.Factory(),
 			NettyShuffleMaster.INSTANCE,
-			true,
 			new PartitionTrackerImpl(
 				jobInformation.getJobId(),
 				NettyShuffleMaster.INSTANCE,
@@ -448,7 +445,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			Time allocationTimeout,
 			PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory,
 			ShuffleMaster<?> shuffleMaster,
-			boolean forcePartitionReleaseOnConsumption,
 			PartitionTracker partitionTracker,
 			ScheduleMode scheduleMode,
 			boolean allowQueuedScheduling) throws IOException {
@@ -511,8 +507,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 		this.shuffleMaster = checkNotNull(shuffleMaster);
 
-		this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
-
 		this.partitionTracker = checkNotNull(partitionTracker);
 
 		this.resultPartitionAvailabilityChecker = new ExecutionGraphResultPartitionAvailabilityChecker(
@@ -737,10 +731,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		return globalModVersion - 1;
 	}
 
-	boolean isForcePartitionReleaseOnConsumption() {
-		return forcePartitionReleaseOnConsumption;
-	}
-
 	@Override
 	public ExecutionJobVertex getJobVertex(JobVertexID id) {
 		return this.tasks.get(id);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 018fd81..adbd5fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -161,9 +161,6 @@ public class ExecutionGraphBuilder {
 		final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
 			PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);
 
-		final boolean forcePartitionReleaseOnConsumption =
-			jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
-
 		// create a new execution graph, if none exists so far
 		final ExecutionGraph executionGraph;
 		try {
@@ -182,7 +179,6 @@ public class ExecutionGraphBuilder {
 					allocationTimeout,
 					partitionReleaseStrategyFactory,
 					shuffleMaster,
-					forcePartitionReleaseOnConsumption,
 					partitionTracker,
 					jobGraph.getScheduleMode(),
 					jobGraph.getAllowQueuedScheduling());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 431360a..f266c77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -104,7 +104,8 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			config.getBlockingSubpartitionType(),
 			config.networkBuffersPerChannel(),
 			config.floatingNetworkBuffersPerGate(),
-			config.networkBufferSize());
+			config.networkBufferSize(),
+			config.isForcePartitionReleaseOnConsumption());
 
 		SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(
 			taskExecutorResourceId,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index c52e8b1..f772b37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -64,8 +64,8 @@ public class PartitionTrackerImpl implements PartitionTracker {
 		Preconditions.checkNotNull(producingTaskExecutorId);
 		Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
 
-		// if it is released on consumption we do not need to issue any release calls
-		if (resultPartitionDeploymentDescriptor.isReleasedOnConsumption()) {
+		// only blocking partitions require explicit release call
+		if (!resultPartitionDeploymentDescriptor.getPartitionType().isBlocking()) {
 			return;
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 4d5cf23..b390987 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -63,6 +63,8 @@ public class ResultPartitionFactory {
 
 	private final int networkBufferSize;
 
+	private final boolean forcePartitionReleaseOnConsumption;
+
 	public ResultPartitionFactory(
 		@Nonnull ResultPartitionManager partitionManager,
 		@Nonnull FileChannelManager channelManager,
@@ -70,7 +72,8 @@ public class ResultPartitionFactory {
 		BoundedBlockingSubpartitionType blockingSubpartitionType,
 		int networkBuffersPerChannel,
 		int floatingNetworkBuffersPerGate,
-		int networkBufferSize) {
+		int networkBufferSize,
+		boolean forcePartitionReleaseOnConsumption) {
 
 		this.partitionManager = partitionManager;
 		this.channelManager = channelManager;
@@ -79,6 +82,7 @@ public class ResultPartitionFactory {
 		this.bufferPoolFactory = bufferPoolFactory;
 		this.blockingSubpartitionType = blockingSubpartitionType;
 		this.networkBufferSize = networkBufferSize;
+		this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
 	}
 
 	public ResultPartition create(
@@ -91,7 +95,6 @@ public class ResultPartitionFactory {
 			desc.getPartitionType(),
 			desc.getNumberOfSubpartitions(),
 			desc.getMaxParallelism(),
-			desc.isReleasedOnConsumption(),
 			createBufferPoolFactory(desc.getNumberOfSubpartitions(), desc.getPartitionType()));
 	}
 
@@ -102,12 +105,11 @@ public class ResultPartitionFactory {
 		@Nonnull ResultPartitionType type,
 		int numberOfSubpartitions,
 		int maxParallelism,
-		boolean releasePartitionOnConsumption,
 		FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
 
 		ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
 
-		ResultPartition partition = releasePartitionOnConsumption
+		ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking()
 			? new ReleaseOnConsumptionResultPartition(
 				taskNameWithSubtaskAndId,
 				id,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
index 8086b27..f758bcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import java.io.Serializable;
 import java.net.InetSocketAddress;
-import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -35,29 +34,19 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
 
 	private static final long serialVersionUID = 852181945034989215L;
 
-	private static final EnumSet<ReleaseType> SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS =
-		EnumSet.of(ReleaseType.AUTO, ReleaseType.MANUAL);
-
-	private static final EnumSet<ReleaseType> SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS =
-		EnumSet.of(ReleaseType.AUTO);
-
 	private final ResourceID producerLocation;
 
 	private final PartitionConnectionInfo partitionConnectionInfo;
 
 	private final ResultPartitionID resultPartitionID;
 
-	private final boolean isBlocking;
-
 	public NettyShuffleDescriptor(
 			ResourceID producerLocation,
 			PartitionConnectionInfo partitionConnectionInfo,
-			ResultPartitionID resultPartitionID,
-			boolean isBlocking) {
+			ResultPartitionID resultPartitionID) {
 		this.producerLocation = producerLocation;
 		this.partitionConnectionInfo = partitionConnectionInfo;
 		this.resultPartitionID = resultPartitionID;
-		this.isBlocking = isBlocking;
 	}
 
 	public ConnectionID getConnectionId() {
@@ -74,12 +63,6 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
 		return Optional.of(producerLocation);
 	}
 
-	@Override
-	public EnumSet<ReleaseType> getSupportedReleaseTypes() {
-		return isBlocking ?
-			SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS : SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS;
-	}
-
 	public boolean isLocalTo(ResourceID consumerLocation) {
 		return producerLocation.equals(consumerLocation);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index c369ff1..6c2cb32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -42,8 +42,7 @@ public enum NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
 		NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor(
 			producerDescriptor.getProducerLocation(),
 			createConnectionInfo(producerDescriptor, partitionDescriptor.getConnectionIndex()),
-			resultPartitionID,
-			partitionDescriptor.getPartitionType().isBlocking());
+			resultPartitionID);
 
 		return CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
index 5af56f2..17feacb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
@@ -19,12 +19,10 @@
 package org.apache.flink.runtime.shuffle;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -61,39 +59,11 @@ public interface ShuffleDescriptor extends Serializable {
 	 *
 	 * <p>Indicates that this partition occupies local resources in the producing task executor. Such partition requires
 	 * that the task executor is running and being connected to be able to consume the produced data. This is mostly
-	 * relevant for the batch jobs and blocking result partitions which should outlive the producer lifetime and
-	 * be released externally: {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false}.
+	 * relevant for the batch jobs and blocking result partitions which can outlive the producer lifetime and
+	 * be released externally.
 	 * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)} can be used to release such kind of partitions locally.
 	 *
 	 * @return the resource id of the producing task executor if the partition occupies local resources there
 	 */
 	Optional<ResourceID> storesLocalResourcesOn();
-
-	/**
-	 * Return release types supported by Shuffle Service for this partition.
-	 */
-	EnumSet<ReleaseType> getSupportedReleaseTypes();
-
-	/**
-	 * Partition release type.
-	 */
-	enum ReleaseType {
-		/**
-		 * Auto-release the partition after having been fully consumed once.
-		 *
-		 * <p>No additional actions required, like {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
-		 * or {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}
-		 */
-		AUTO,
-
-		/**
-		 * Manually release the partition, the partition has to support consumption multiple times.
-		 *
-		 * <p>The partition requires manual release once all consumption is done:
-		 * {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and
-		 * if the partition occupies producer local resources ({@link #storesLocalResourcesOn()}) then also
-		 * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
-		 */
-		MANUAL
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
index ed66f2d..e6a4802 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -61,17 +60,18 @@ import java.util.Collection;
  *     <li>{@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called
  *     if the production has failed.
  *     </li>
- *     <li>if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true} and
- *     {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called when the production is done.
- *     The actual release can take some time depending on implementation details,
- *     e.g. if the `end of consumption' confirmation from the consumer is being awaited implicitly.
- *     The partition has to support the {@link ReleaseType#AUTO} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}.</li>
- *     <li>if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false} and
- *     {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)},
- *     if it occupies any producer local resources ({@link ShuffleDescriptor#storesLocalResourcesOn()}),
- *     are called outside of the producer thread, e.g. to manage the lifecycle of BLOCKING result partitions
- *     which can outlive their producers. The partition has to support the {@link ReleaseType#MANUAL} in
- *     {@link ShuffleDescriptor#getSupportedReleaseTypes()}.</li>
+ *     <li>for PIPELINED partitions if there was a detected consumption attempt and it either failed or finished
+ *     after the bounded production has been done ({@link ResultPartitionWriter#finish()} and
+ *     {@link ResultPartitionWriter#close()} have been called). Only one consumption attempt is ever expected for
+ *     the PIPELINED partition at the moment so it can be released afterwards.
+ *     <li>if the following methods are called outside of the producer thread:
+ *     <ol>
+ *         <li>{@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}</li>
+ *         <li>and if it occupies any producer local resources ({@link ShuffleDescriptor#storesLocalResourcesOn()})
+ *             then also {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}</li>
+ *     </ol>
+ *     e.g. to manage the lifecycle of BLOCKING result partitions which can outlive their producers.
+ *     The BLOCKING partitions can be consumed multiple times.</li>
  * </ol>
  * The partitions, which currently still occupy local resources, can be queried with
  * {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}.
@@ -132,8 +132,6 @@ public interface ShuffleEnvironment<P extends ResultPartitionWriter, G extends I
 	 *
 	 * <p>This is called for partitions which occupy resources locally
 	 * (can be checked by {@link ShuffleDescriptor#storesLocalResourcesOn()}).
-	 * This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
-	 * The partition has to support the {@link ReleaseType#MANUAL} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}.
 	 *
 	 * @param partitionIds identifying the partitions to be released
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
index a9ef1c6..9f729c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.shuffle;
 
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
-
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
@@ -51,8 +48,6 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> {
 	 *
 	 * <p>This call triggers release of any resources which are occupied by the given partition in the external systems
 	 * outside of the producer executor. This is mostly relevant for the batch jobs and blocking result partitions.
-	 * This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
-	 * The partition has to support the {@link ReleaseType#MANUAL} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}.
 	 * The producer local resources are managed by {@link ShuffleDescriptor#storesLocalResourcesOn()} and
 	 * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
index 7c35516..339f343 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.shuffle;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
-import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -57,9 +56,4 @@ public final class UnknownShuffleDescriptor implements ShuffleDescriptor {
 	public Optional<ResourceID> storesLocalResourcesOn() {
 		return Optional.empty();
 	}
-
-	@Override
-	public EnumSet<ReleaseType> getSupportedReleaseTypes() {
-		return EnumSet.noneOf(ReleaseType.class);
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8c99c0f..9b295dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -613,8 +613,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	private void setupResultPartitionBookkeeping(TaskDeploymentDescriptor tdd, CompletableFuture<ExecutionState> terminationFuture) {
 		final List<ResultPartitionID> partitionsRequiringRelease = tdd.getProducedPartitions().stream()
-			// partitions that are released on consumption don't require any explicit release call
-			.filter(d -> !d.isReleasedOnConsumption())
+			// only blocking partitions require explicit release call
+			.filter(d -> d.getPartitionType().isBlocking())
 			.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
 			// partitions without local resources don't store anything on the TaskExecutor
 			.filter(d -> d.storesLocalResourcesOn().isPresent())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index e73cc6a..38817a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -73,6 +73,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
 	private final BoundedBlockingSubpartitionType blockingSubpartitionType;
 
+	private final boolean forcePartitionReleaseOnConsumption;
+
 	public NettyShuffleEnvironmentConfiguration(
 			int numNetworkBuffers,
 			int networkBufferSize,
@@ -85,7 +87,8 @@ public class NettyShuffleEnvironmentConfiguration {
 			boolean isNetworkDetailedMetrics,
 			@Nullable NettyConfig nettyConfig,
 			String[] tempDirs,
-			BoundedBlockingSubpartitionType blockingSubpartitionType) {
+			BoundedBlockingSubpartitionType blockingSubpartitionType,
+			boolean forcePartitionReleaseOnConsumption) {
 
 		this.numNetworkBuffers = numNetworkBuffers;
 		this.networkBufferSize = networkBufferSize;
@@ -99,6 +102,7 @@ public class NettyShuffleEnvironmentConfiguration {
 		this.nettyConfig = nettyConfig;
 		this.tempDirs = Preconditions.checkNotNull(tempDirs);
 		this.blockingSubpartitionType = Preconditions.checkNotNull(blockingSubpartitionType);
+		this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
 	}
 
 	// ------------------------------------------------------------------------
@@ -151,6 +155,10 @@ public class NettyShuffleEnvironmentConfiguration {
 		return blockingSubpartitionType;
 	}
 
+	public boolean isForcePartitionReleaseOnConsumption() {
+		return forcePartitionReleaseOnConsumption;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -194,6 +202,9 @@ public class NettyShuffleEnvironmentConfiguration {
 
 		BoundedBlockingSubpartitionType blockingSubpartitionType = getBlockingSubpartitionType(configuration);
 
+		boolean forcePartitionReleaseOnConsumption =
+			configuration.getBoolean(NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
+
 		return new NettyShuffleEnvironmentConfiguration(
 			numberOfNetworkBuffers,
 			pageSize,
@@ -206,7 +217,8 @@ public class NettyShuffleEnvironmentConfiguration {
 			isNetworkDetailedMetrics,
 			nettyConfig,
 			tempDirs,
-			blockingSubpartitionType);
+			blockingSubpartitionType,
+			forcePartitionReleaseOnConsumption);
 	}
 
 	/**
@@ -521,6 +533,7 @@ public class NettyShuffleEnvironmentConfiguration {
 		result = 31 * result + (isCreditBased ? 1 : 0);
 		result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
 		result = 31 * result + Arrays.hashCode(tempDirs);
+		result = 31 * result + (forcePartitionReleaseOnConsumption ? 1 : 0);
 		return result;
 	}
 
@@ -544,7 +557,8 @@ public class NettyShuffleEnvironmentConfiguration {
 					this.requestSegmentsTimeout.equals(that.requestSegmentsTimeout) &&
 					this.isCreditBased == that.isCreditBased &&
 					(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null) &&
-					Arrays.equals(this.tempDirs, that.tempDirs);
+					Arrays.equals(this.tempDirs, that.tempDirs) &&
+					this.forcePartitionReleaseOnConsumption == that.forcePartitionReleaseOnConsumption;
 		}
 	}
 
@@ -561,6 +575,7 @@ public class NettyShuffleEnvironmentConfiguration {
 				", isCreditBased=" + isCreditBased +
 				", nettyConfig=" + nettyConfig +
 				", tempDirs=" + Arrays.toString(tempDirs) +
+				", forcePartitionReleaseOnConsumption=" + forcePartitionReleaseOnConsumption +
 				'}';
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 49cd478..e76af09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -30,9 +30,7 @@ import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
-import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -90,8 +88,7 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
 		ShuffleDescriptor shuffleDescriptor = new NettyShuffleDescriptor(
 			producerLocation,
 			new NetworkPartitionConnectionInfo(connectionID),
-			resultPartitionID,
-			false);
+			resultPartitionID);
 
 		ResultPartitionDeploymentDescriptor copy =
 			createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
@@ -104,16 +101,6 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
 		assertThat(shuffleDescriptorCopy.getConnectionId(), is(connectionID));
 	}
 
-	@Test(expected = IllegalArgumentException.class)
-	public void testIncompatibleReleaseTypeManual() {
-		new ResultPartitionDeploymentDescriptor(
-			partitionDescriptor,
-			NettyShuffleDescriptorBuilder.newBuilder().setBlocking(false).buildLocal(),
-			1,
-			true,
-			ReleaseType.MANUAL);
-	}
-
 	private static ResultPartitionDeploymentDescriptor createCopyAndVerifyResultPartitionDeploymentDescriptor(
 			ShuffleDescriptor shuffleDescriptor) throws IOException {
 		ResultPartitionDeploymentDescriptor orig = new ResultPartitionDeploymentDescriptor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 2d360ba..96b6330 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -143,7 +143,8 @@ public class NettyShuffleEnvironmentBuilder {
 				isNetworkDetailedMetrics,
 				nettyConfig,
 				tempDirs,
-				BoundedBlockingSubpartitionType.AUTO),
+				BoundedBlockingSubpartitionType.AUTO,
+				false),
 			taskManagerLocation,
 			taskEventDispatcher,
 			metricGroup);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 5f80421..cf50051 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -30,8 +29,6 @@ import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.hamcrest.Matchers;
 
 import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Optional;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertThat;
@@ -97,8 +94,9 @@ public class PartitionTestUtils {
 		}
 	}
 
-	public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(ResultPartitionType partitionType) {
-		ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal();
+	static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(
+		ResultPartitionType partitionType) {
+		ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
 		PartitionDescriptor partitionDescriptor = new PartitionDescriptor(
 			new IntermediateDataSetID(),
 			shuffleDescriptor.getResultPartitionID().getPartitionId(),
@@ -112,52 +110,8 @@ public class PartitionTestUtils {
 			true);
 	}
 
-	public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(ShuffleDescriptor.ReleaseType releaseType) {
-		// set partition to blocking to support all release types
-		ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().setBlocking(true).buildLocal();
-		PartitionDescriptor partitionDescriptor = new PartitionDescriptor(
-			new IntermediateDataSetID(),
-			shuffleDescriptor.getResultPartitionID().getPartitionId(),
-			ResultPartitionType.BLOCKING,
-			1,
-			0);
-		return new ResultPartitionDeploymentDescriptor(
-			partitionDescriptor,
-			shuffleDescriptor,
-			1,
-			true,
-			releaseType);
-	}
-
-	public static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(ResultPartitionID resultPartitionId, ShuffleDescriptor.ReleaseType releaseType, boolean hasLocalResources) {
-		return new ResultPartitionDeploymentDescriptor(
-			new PartitionDescriptor(
-				new IntermediateDataSetID(),
-				resultPartitionId.getPartitionId(),
-				ResultPartitionType.BLOCKING,
-				1,
-				0),
-			new ShuffleDescriptor() {
-				@Override
-				public ResultPartitionID getResultPartitionID() {
-					return resultPartitionId;
-				}
-
-				@Override
-				public Optional<ResourceID> storesLocalResourcesOn() {
-					return hasLocalResources
-						? Optional.of(ResourceID.generate())
-						: Optional.empty();
-				}
-
-				@Override
-				public EnumSet<ReleaseType> getSupportedReleaseTypes() {
-					return EnumSet.of(releaseType);
-				}
-			},
-			1,
-			true,
-			releaseType);
+	public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor() {
+		return createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
 	}
 
 	public static void writeBuffers(ResultPartition partition, int numberOfBuffers, int bufferSize) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
index 07aba93..5ca7156 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
@@ -34,14 +34,12 @@ import org.junit.Test;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
@@ -53,15 +51,15 @@ public class PartitionTrackerImplTest extends TestLogger {
 
 	@Test
 	public void testReleasedOnConsumptionPartitionIsNotTracked() {
-		testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType.AUTO);
+		testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
 	}
 
 	@Test
 	public void testRetainedOnConsumptionPartitionIsTracked() {
-		testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType.MANUAL);
+		testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
 	}
 
-	private void testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType releaseType) {
+	private void testReleaseOnConsumptionHandling(ResultPartitionType resultPartitionType) {
 		final PartitionTracker partitionTracker = new PartitionTrackerImpl(
 			new JobID(),
 			new TestingShuffleMaster(),
@@ -74,10 +72,11 @@ public class PartitionTrackerImplTest extends TestLogger {
 			resourceId,
 			createResultPartitionDeploymentDescriptor(
 				resultPartitionId,
-				releaseType,
+				resultPartitionType,
 				false));
 
-		assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(not(releaseType)));
+		final boolean isTrackingExpected = resultPartitionType == ResultPartitionType.BLOCKING;
+		assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(isTrackingExpected));
 	}
 
 	@Test
@@ -97,7 +96,7 @@ public class PartitionTrackerImplTest extends TestLogger {
 
 		partitionTracker.startTrackingPartition(
 			executorWithTrackedPartition,
-			createResultPartitionDeploymentDescriptor(new ResultPartitionID(), ShuffleDescriptor.ReleaseType.MANUAL, true));
+			createResultPartitionDeploymentDescriptor(new ResultPartitionID(), true));
 
 		assertThat(partitionTracker.isTrackingPartitionsFor(executorWithTrackedPartition), is(true));
 		assertThat(partitionTracker.isTrackingPartitionsFor(executorWithoutTrackedPartition), is(false));
@@ -127,10 +126,10 @@ public class PartitionTrackerImplTest extends TestLogger {
 
 		partitionTracker.startTrackingPartition(
 			taskExecutorId1,
-			createResultPartitionDeploymentDescriptor(resultPartitionId1, ShuffleDescriptor.ReleaseType.MANUAL, true));
+			createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
 		partitionTracker.startTrackingPartition(
 			taskExecutorId2,
-			createResultPartitionDeploymentDescriptor(resultPartitionId2, ShuffleDescriptor.ReleaseType.MANUAL, true));
+			createResultPartitionDeploymentDescriptor(resultPartitionId2, true));
 
 		{
 			partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1);
@@ -183,10 +182,10 @@ public class PartitionTrackerImplTest extends TestLogger {
 
 		partitionTracker.startTrackingPartition(
 			taskExecutorId1,
-			createResultPartitionDeploymentDescriptor(resultPartitionId1, ShuffleDescriptor.ReleaseType.MANUAL, false));
+			createResultPartitionDeploymentDescriptor(resultPartitionId1, false));
 		partitionTracker.startTrackingPartition(
 			taskExecutorId2,
-			createResultPartitionDeploymentDescriptor(resultPartitionId2, ShuffleDescriptor.ReleaseType.MANUAL, false));
+			createResultPartitionDeploymentDescriptor(resultPartitionId2, false));
 
 		{
 			partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1);
@@ -227,7 +226,7 @@ public class PartitionTrackerImplTest extends TestLogger {
 
 		partitionTracker.startTrackingPartition(
 			taskExecutorId1,
-			createResultPartitionDeploymentDescriptor(resultPartitionId1, ShuffleDescriptor.ReleaseType.MANUAL, true));
+			createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
 
 		partitionTracker.stopTrackingPartitionsFor(taskExecutorId1);
 
@@ -236,15 +235,21 @@ public class PartitionTrackerImplTest extends TestLogger {
 	}
 
 	private static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(
+			ResultPartitionID resultPartitionId,
+			boolean hasLocalResources) {
+		return createResultPartitionDeploymentDescriptor(resultPartitionId, ResultPartitionType.BLOCKING, hasLocalResources);
+	}
+
+	private static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(
 		ResultPartitionID resultPartitionId,
-		ShuffleDescriptor.ReleaseType releaseType,
+		ResultPartitionType type,
 		boolean hasLocalResources) {
 
 		return new ResultPartitionDeploymentDescriptor(
 			new PartitionDescriptor(
 				new IntermediateDataSetID(),
 				resultPartitionId.getPartitionId(),
-				ResultPartitionType.BLOCKING,
+				type,
 				1,
 				0),
 			new ShuffleDescriptor() {
@@ -259,15 +264,9 @@ public class PartitionTrackerImplTest extends TestLogger {
 						? Optional.of(ResourceID.generate())
 						: Optional.empty();
 				}
-
-				@Override
-				public EnumSet<ReleaseType> getSupportedReleaseTypes() {
-					return EnumSet.of(releaseType);
-				}
 			},
 			1,
-			true,
-			releaseType);
+			true);
 	}
 
 	private static TaskExecutorGateway createTaskExecutorGateway(ResourceID taskExecutorId, Collection<Tuple3<ResourceID, JobID, Collection<ResultPartitionID>>> releaseCalls) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 8eb68d3..27eaab8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -142,7 +142,8 @@ public class ResultPartitionBuilder {
 			blockingSubpartitionType,
 			networkBuffersPerChannel,
 			floatingNetworkBuffersPerGate,
-			networkBufferSize);
+			networkBufferSize,
+			releasedOnConsumption);
 
 		FunctionWithException<BufferPoolOwner, BufferPool, IOException> factory = bufferPoolFactory.orElseGet(() ->
 			resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, partitionType));
@@ -153,7 +154,6 @@ public class ResultPartitionBuilder {
 			partitionType,
 			numberOfSubpartitions,
 			numTargetKeyGroups,
-			releasedOnConsumption,
 			factory);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index b2a4d16..8065829 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
@@ -57,18 +56,26 @@ public class ResultPartitionFactoryTest extends TestLogger {
 	}
 
 	@Test
-	public void testConsumptionOnReleaseEnabled() {
-		final ResultPartition resultPartition = createResultPartition(ShuffleDescriptor.ReleaseType.AUTO);
+	public void testConsumptionOnReleaseForced() {
+		final ResultPartition resultPartition = createResultPartition(true, ResultPartitionType.BLOCKING);
+		assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));
+	}
+
+	@Test
+	public void testConsumptionOnReleaseEnabledForNonBlocking() {
+		final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.PIPELINED);
 		assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));
 	}
 
 	@Test
 	public void testConsumptionOnReleaseDisabled() {
-		final ResultPartition resultPartition = createResultPartition(ShuffleDescriptor.ReleaseType.MANUAL);
+		final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.BLOCKING);
 		assertThat(resultPartition, not(instanceOf(ReleaseOnConsumptionResultPartition.class)));
 	}
 
-	private static ResultPartition createResultPartition(ShuffleDescriptor.ReleaseType releaseType) {
+	private static ResultPartition createResultPartition(
+			boolean releasePartitionOnConsumption,
+			ResultPartitionType partitionType) {
 		ResultPartitionFactory factory = new ResultPartitionFactory(
 			new ResultPartitionManager(),
 			fileChannelManager,
@@ -76,9 +83,9 @@ public class ResultPartitionFactoryTest extends TestLogger {
 			BoundedBlockingSubpartitionType.AUTO,
 			1,
 			1,
-			64);
+			64,
+			releasePartitionOnConsumption);
 
-		ResultPartitionType partitionType = ResultPartitionType.BLOCKING;
 		final ResultPartitionDeploymentDescriptor descriptor = new ResultPartitionDeploymentDescriptor(
 			new PartitionDescriptor(
 				new IntermediateDataSetID(),
@@ -86,10 +93,9 @@ public class ResultPartitionFactoryTest extends TestLogger {
 				partitionType,
 				1,
 				0),
-			NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
+			NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
 			1,
-			true,
-			releaseType
+			true
 		);
 
 		return factory.create("test", descriptor);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index f5999e9..d87cf84 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -57,7 +57,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -217,7 +216,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		boolean waitForRelease) throws Exception {
 
 		final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
-			PartitionTestUtils.createPartitionDeploymentDescriptor(ShuffleDescriptor.ReleaseType.MANUAL);
+			PartitionTestUtils.createPartitionDeploymentDescriptor();
 		final ExecutionAttemptID eid1 = taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
 
 		final TaskDeploymentDescriptor taskDeploymentDescriptor =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
index 0ecd81a..2d58d03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
@@ -40,8 +40,6 @@ public class NettyShuffleDescriptorBuilder {
 	private InetAddress address = InetAddress.getLoopbackAddress();
 	private int dataPort = 0;
 	private int connectionIndex = 0;
-	private boolean isBlocking;
-
 	public NettyShuffleDescriptorBuilder setProducerLocation(ResourceID producerLocation) {
 		this.producerLocation = producerLocation;
 		return this;
@@ -74,26 +72,19 @@ public class NettyShuffleDescriptorBuilder {
 		return this;
 	}
 
-	public NettyShuffleDescriptorBuilder setBlocking(boolean isBlocking) {
-		this.isBlocking = isBlocking;
-		return this;
-	}
-
 	public NettyShuffleDescriptor buildRemote() {
 		ConnectionID connectionID = new ConnectionID(new InetSocketAddress(address, dataPort), connectionIndex);
 		return new NettyShuffleDescriptor(
 			producerLocation,
 			new NetworkPartitionConnectionInfo(connectionID),
-			id,
-			isBlocking);
+			id);
 	}
 
 	public NettyShuffleDescriptor buildLocal() {
 		return new NettyShuffleDescriptor(
 			producerLocation,
 			LocalExecutionPartitionConnectionInfo.INSTANCE,
-			id,
-			isBlocking);
+			id);
 	}
 
 	public static NettyShuffleDescriptorBuilder newBuilder() {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
index 1549864..9e90c20 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
@@ -58,7 +58,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
-import static org.apache.flink.configuration.JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION;
+import static org.apache.flink.configuration.NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION;
 import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;


[flink] 13/15: [hotfix][network] Simplify ResultPartitionFactory.createSubpartitions based on ResultPartitionType.isBlocking

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18c36868f90838fe520dbb827248497934fa237c
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Jul 31 16:15:30 2019 +0300

    [hotfix][network] Simplify ResultPartitionFactory.createSubpartitions based on ResultPartitionType.isBlocking
---
 .../network/partition/ResultPartitionFactory.java  | 27 +++++++++-------------
 .../partition/ResultPartitionFactoryTest.java      | 14 +++++++++++
 2 files changed, 25 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 0656e6e..4933a4e 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -133,22 +133,17 @@ public class ResultPartitionFactory {
 			BoundedBlockingSubpartitionType blockingSubpartitionType,
 			ResultSubpartition[] subpartitions) {
 		// Create the subpartitions.
-		switch (type) {
-			case BLOCKING:
-			case BLOCKING_PERSISTENT:
-				initializeBoundedBlockingPartitions(subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager);
-				break;
-
-			case PIPELINED:
-			case PIPELINED_BOUNDED:
-				for (int i = 0; i < subpartitions.length; i++) {
-					subpartitions[i] = new PipelinedSubpartition(i, partition);
-				}
-
-				break;
-
-			default:
-				throw new IllegalArgumentException("Unsupported result partition type.");
+		if (type.isBlocking()) {
+			initializeBoundedBlockingPartitions(
+				subpartitions,
+				partition,
+				blockingSubpartitionType,
+				networkBufferSize,
+				channelManager);
+		} else {
+			for (int i = 0; i < subpartitions.length; i++) {
+				subpartitions[i] = new PipelinedSubpartition(i, partition);
+			}
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 1c8591f..653c7f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -32,6 +32,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Arrays;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -58,6 +60,18 @@ public class ResultPartitionFactoryTest extends TestLogger {
 	}
 
 	@Test
+	public void testBoundedBlockingSubpartitionsCreated() {
+		final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.BLOCKING);
+		Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(BoundedBlockingSubpartition.class)));
+	}
+
+	@Test
+	public void testPipelinedSubpartitionsCreated() {
+		final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.PIPELINED);
+		Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(PipelinedSubpartition.class)));
+	}
+
+	@Test
 	public void testConsumptionOnReleaseForced() {
 		final ResultPartition resultPartition = createResultPartition(true, ResultPartitionType.BLOCKING);
 		assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));


[flink] 03/15: [hotfix][network] fix codestyle issues in ResultPartitionFactory

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a7869bcf3578cacc188b09c6d1f0193c11eb39c8
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:27:50 2019 +0300

    [hotfix][network] fix codestyle issues in ResultPartitionFactory
---
 .../network/partition/ResultPartitionFactory.java  | 49 ++++++++++------------
 1 file changed, 21 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index b390987..0656e6e 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -33,8 +33,6 @@ import org.apache.flink.util.function.FunctionWithException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Optional;
@@ -46,13 +44,10 @@ public class ResultPartitionFactory {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionFactory.class);
 
-	@Nonnull
 	private final ResultPartitionManager partitionManager;
 
-	@Nonnull
 	private final FileChannelManager channelManager;
 
-	@Nonnull
 	private final BufferPoolFactory bufferPoolFactory;
 
 	private final BoundedBlockingSubpartitionType blockingSubpartitionType;
@@ -66,9 +61,9 @@ public class ResultPartitionFactory {
 	private final boolean forcePartitionReleaseOnConsumption;
 
 	public ResultPartitionFactory(
-		@Nonnull ResultPartitionManager partitionManager,
-		@Nonnull FileChannelManager channelManager,
-		@Nonnull BufferPoolFactory bufferPoolFactory,
+		ResultPartitionManager partitionManager,
+		FileChannelManager channelManager,
+		BufferPoolFactory bufferPoolFactory,
 		BoundedBlockingSubpartitionType blockingSubpartitionType,
 		int networkBuffersPerChannel,
 		int floatingNetworkBuffersPerGate,
@@ -86,9 +81,8 @@ public class ResultPartitionFactory {
 	}
 
 	public ResultPartition create(
-		@Nonnull String taskNameWithSubtaskAndId,
-		@Nonnull ResultPartitionDeploymentDescriptor desc) {
-
+			String taskNameWithSubtaskAndId,
+			ResultPartitionDeploymentDescriptor desc) {
 		return create(
 			taskNameWithSubtaskAndId,
 			desc.getShuffleDescriptor().getResultPartitionID(),
@@ -100,13 +94,12 @@ public class ResultPartitionFactory {
 
 	@VisibleForTesting
 	public ResultPartition create(
-		@Nonnull String taskNameWithSubtaskAndId,
-		@Nonnull ResultPartitionID id,
-		@Nonnull ResultPartitionType type,
-		int numberOfSubpartitions,
-		int maxParallelism,
-		FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
-
+			String taskNameWithSubtaskAndId,
+			ResultPartitionID id,
+			ResultPartitionType type,
+			int numberOfSubpartitions,
+			int maxParallelism,
+			FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
 		ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
 
 		ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking()
@@ -139,10 +132,10 @@ public class ResultPartitionFactory {
 			ResultPartitionType type,
 			BoundedBlockingSubpartitionType blockingSubpartitionType,
 			ResultSubpartition[] subpartitions) {
-
 		// Create the subpartitions.
 		switch (type) {
 			case BLOCKING:
+			case BLOCKING_PERSISTENT:
 				initializeBoundedBlockingPartitions(subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager);
 				break;
 
@@ -160,15 +153,14 @@ public class ResultPartitionFactory {
 	}
 
 	private static void initializeBoundedBlockingPartitions(
-		ResultSubpartition[] subpartitions,
-		ResultPartition parent,
-		BoundedBlockingSubpartitionType blockingSubpartitionType,
-		int networkBufferSize,
-		FileChannelManager channelManager) {
-
+			ResultSubpartition[] subpartitions,
+			ResultPartition parent,
+			BoundedBlockingSubpartitionType blockingSubpartitionType,
+			int networkBufferSize,
+			FileChannelManager channelManager) {
 		int i = 0;
 		try {
-			for (; i < subpartitions.length; i++) {
+			for (i = 0; i < subpartitions.length; i++) {
 				final File spillFile = channelManager.createChannel().getPathFile();
 				subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize);
 			}
@@ -194,8 +186,8 @@ public class ResultPartitionFactory {
 
 	@VisibleForTesting
 	FunctionWithException<BufferPoolOwner, BufferPool, IOException> createBufferPoolFactory(
-		int numberOfSubpartitions, ResultPartitionType type) {
-
+			int numberOfSubpartitions,
+			ResultPartitionType type) {
 		return p -> {
 			int maxNumberOfMemorySegments = type.isBounded() ?
 				numberOfSubpartitions * networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
@@ -213,6 +205,7 @@ public class ResultPartitionFactory {
 				return BoundedBlockingSubpartitionType.FILE_MMAP;
 			case _32_BIT:
 				return BoundedBlockingSubpartitionType.FILE;
+			case UNKNOWN:
 			default:
 				LOG.warn("Cannot determine memory architecture. Using pure file-based shuffle.");
 				return BoundedBlockingSubpartitionType.FILE;