You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/02 15:08:19 UTC

[flink] 05/05: [FLINK-12883][runtime] Introduce PartitionReleaseStrategy

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c9aa9a170ff657198c4710706bc3802de42063ca
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed Jun 19 15:06:51 2019 +0200

    [FLINK-12883][runtime] Introduce PartitionReleaseStrategy
    
    - Introduce interface PartitionReleaseStrategy.
    - Introduce RegionPartitionReleaseStrategy and
      NotReleasingPartitionReleaseStrategy implementations, which can be configured
      via a new config option.
    - Add unit tests for new classes.
    - Increase visibility of methods in TestingSchedulingTopology for unit tests
      outside of its package.
---
 .../flink/configuration/JobManagerOptions.java     |   9 +
 .../runtime/executiongraph/ExecutionGraph.java     | 132 +++++++++++---
 .../executiongraph/ExecutionGraphBuilder.java      |   6 +
 .../runtime/executiongraph/ExecutionVertex.java    |   1 +
 .../failover/flip1/PipelinedRegionComputeUtil.java |  20 +-
 .../NotReleasingPartitionReleaseStrategy.java      |  56 ++++++
 .../partitionrelease/PartitionReleaseStrategy.java |  58 ++++++
 .../PartitionReleaseStrategyFactoryLoader.java     |  41 +++++
 .../flip1/partitionrelease/PipelinedRegion.java    |  69 +++++++
 .../PipelinedRegionConsumedBlockingPartitions.java |  51 ++++++
 .../PipelinedRegionExecutionView.java              |  64 +++++++
 .../RegionPartitionReleaseStrategy.java            | 190 +++++++++++++++++++
 .../ExecutionGraphPartitionReleaseTest.java        | 202 +++++++++++++++++++++
 .../RegionPartitionReleaseStrategyTest.java        | 149 +++++++++++++++
 .../PartitionReleaseStrategyFactoryLoaderTest.java |  55 ++++++
 .../PipelinedRegionExecutionViewTest.java          |  75 ++++++++
 .../strategy/TestingSchedulingTopology.java        |  14 +-
 17 files changed, 1154 insertions(+), 38 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 89515fd..0f9e18d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -173,6 +173,15 @@ public class JobManagerOptions {
 					text("'legacy': legacy scheduler"),
 					text("'ng': new generation scheduler"))
 				.build());
+	/**
+	 * Config parameter controlling whether partitions should already be released during the job execution.
+	 */
+	@Documentation.ExcludeFromDocumentation("User normally should not be expected to deactivate this feature. " +
+		"We aim at removing this flag eventually.")
+	public static final ConfigOption<Boolean> PARTITION_RELEASE_DURING_JOB_EXECUTION =
+		key("jobmanager.partition.release-during-job-execution")
+			.defaultValue(true)
+			.withDescription("Controls whether partitions should already be released during the job execution.");
 
 	@Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
 	public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 514121e..ce65b68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -48,6 +48,9 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
 import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -55,6 +58,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -66,6 +70,11 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -250,6 +259,12 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	/** The total number of vertices currently in the execution graph. */
 	private int numVerticesTotal;
 
+	private final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory;
+
+	private PartitionReleaseStrategy partitionReleaseStrategy;
+
+	private SchedulingTopology schedulingTopology;
+
 	// ------ Configuration of the Execution -------
 
 	/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -413,6 +428,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			userClassLoader,
 			blobWriter,
 			allocationTimeout,
+			new NotReleasingPartitionReleaseStrategy.Factory(),
 			NettyShuffleMaster.INSTANCE,
 			true,
 			new PartitionTrackerImpl(
@@ -433,6 +449,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			ClassLoader userClassLoader,
 			BlobWriter blobWriter,
 			Time allocationTimeout,
+			PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory,
 			ShuffleMaster<?> shuffleMaster,
 			boolean forcePartitionReleaseOnConsumption,
 			PartitionTracker partitionTracker) throws IOException {
@@ -464,6 +481,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		this.rpcTimeout = checkNotNull(rpcTimeout);
 		this.allocationTimeout = checkNotNull(allocationTimeout);
 
+		this.partitionReleaseStrategyFactory = checkNotNull(partitionReleaseStrategyFactory);
+
 		this.restartStrategy = restartStrategy;
 		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());
 
@@ -913,6 +932,11 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		}
 
 		failoverStrategy.notifyNewVertices(newExecJobVertices);
+
+		schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(this);
+		partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(
+			schedulingTopology,
+			new DefaultFailoverTopology(this));
 	}
 
 	public void scheduleForExecution() throws JobException {
@@ -1605,36 +1629,9 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 		if (attempt != null) {
 			try {
-				Map<String, Accumulator<?, ?>> accumulators;
-
-				switch (state.getExecutionState()) {
-					case RUNNING:
-						return attempt.switchToRunning();
-
-					case FINISHED:
-						// this deserialization is exception-free
-						accumulators = deserializeAccumulators(state);
-						attempt.markFinished(accumulators, state.getIOMetrics());
-						return true;
-
-					case CANCELED:
-						// this deserialization is exception-free
-						accumulators = deserializeAccumulators(state);
-						attempt.completeCancelling(accumulators, state.getIOMetrics());
-						return true;
-
-					case FAILED:
-						// this deserialization is exception-free
-						accumulators = deserializeAccumulators(state);
-						attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
-						return true;
-
-					default:
-						// we mark as failed and return false, which triggers the TaskManager
-						// to remove the task
-						attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
-						return false;
-				}
+				final boolean stateUpdated = updateStateInternal(state, attempt);
+				maybeReleasePartitions(attempt);
+				return stateUpdated;
 			}
 			catch (Throwable t) {
 				ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
@@ -1649,6 +1646,77 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		}
 	}
 
+	private boolean updateStateInternal(final TaskExecutionState state, final Execution attempt) {
+		Map<String, Accumulator<?, ?>> accumulators;
+
+		switch (state.getExecutionState()) {
+			case RUNNING:
+				return attempt.switchToRunning();
+
+			case FINISHED:
+				// this deserialization is exception-free
+				accumulators = deserializeAccumulators(state);
+				attempt.markFinished(accumulators, state.getIOMetrics());
+				return true;
+
+			case CANCELED:
+				// this deserialization is exception-free
+				accumulators = deserializeAccumulators(state);
+				attempt.completeCancelling(accumulators, state.getIOMetrics());
+				return true;
+
+			case FAILED:
+				// this deserialization is exception-free
+				accumulators = deserializeAccumulators(state);
+				attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
+				return true;
+
+			default:
+				// we mark as failed and return false, which triggers the TaskManager
+				// to remove the task
+				attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
+				return false;
+		}
+	}
+
+	private void maybeReleasePartitions(final Execution attempt) {
+		final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID();
+
+		if (attempt.getState() == ExecutionState.FINISHED) {
+			final List<IntermediateResultPartitionID> releasablePartitions = partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
+			releasePartitions(releasablePartitions);
+		} else {
+			partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
+		}
+	}
+
+	private void releasePartitions(final List<IntermediateResultPartitionID> releasablePartitions) {
+		if (releasablePartitions.size() > 0) {
+			final List<ResultPartitionID> partitionIds = releasablePartitions.stream()
+				.map(this::createResultPartitionId)
+				.collect(Collectors.toList());
+
+			partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
+		}
+	}
+
+	private ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) {
+		final SchedulingResultPartition schedulingResultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
+		final SchedulingExecutionVertex producer = schedulingResultPartition.getProducer();
+		final ExecutionVertexID producerId = producer.getId();
+		final JobVertexID jobVertexId = producerId.getJobVertexId();
+		final ExecutionJobVertex jobVertex = getJobVertex(jobVertexId);
+		checkNotNull(jobVertex, "Unknown job vertex %s", jobVertexId);
+
+		final ExecutionVertex[] taskVertices = jobVertex.getTaskVertices();
+		final int subtaskIndex = producerId.getSubtaskIndex();
+		checkState(subtaskIndex < taskVertices.length, "Invalid subtask index %d for job vertex %s", subtaskIndex, jobVertexId);
+
+		final ExecutionVertex taskVertex = taskVertices[subtaskIndex];
+		final Execution execution = taskVertex.getCurrentExecutionAttempt();
+		return new ResultPartitionID(resultPartitionId, execution.getAttemptId());
+	}
+
 	/**
 	 * Deserializes accumulators from a task state update.
 	 *
@@ -1835,4 +1903,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	public PartitionTracker getPartitionTracker() {
 		return partitionTracker;
 	}
+
+	PartitionReleaseStrategy getPartitionReleaseStrategy() {
+		return partitionReleaseStrategy;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f21d703..1b9d7a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
 import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
 import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
@@ -117,6 +119,9 @@ public class ExecutionGraphBuilder {
 		final int maxPriorAttemptsHistoryLength =
 				jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
 
+		final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
+			PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);
+
 		final boolean forcePartitionReleaseOnConsumption =
 			jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
 
@@ -136,6 +141,7 @@ public class ExecutionGraphBuilder {
 					classLoader,
 					blobWriter,
 					allocationTimeout,
+					partitionReleaseStrategyFactory,
 					shuffleMaster,
 					forcePartitionReleaseOnConsumption,
 					partitionTracker);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 5fc0b72..349aaff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -601,6 +601,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			if (oldState.isTerminal()) {
 				if (oldState == FINISHED) {
 					oldExecution.stopTrackingAndReleasePartitions();
+					getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
 				}
 
 				priorExecutions.add(oldExecution.archive());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index 14d28b7..8f19ed9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -19,6 +19,9 @@
 
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,14 +30,29 @@ import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
- * Utility for computing pipeliend regions.
+ * Utility for computing pipelined regions.
  */
 public final class PipelinedRegionComputeUtil {
 
 	private static final Logger LOG = LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);
 
+	public static Set<PipelinedRegion> toPipelinedRegionsSet(final Set<Set<FailoverVertex>> distinctRegions) {
+		return distinctRegions.stream()
+			.map(toExecutionVertexIdSet())
+			.map(PipelinedRegion::from)
+			.collect(Collectors.toSet());
+	}
+
+	private static Function<Set<FailoverVertex>, Set<ExecutionVertexID>> toExecutionVertexIdSet() {
+		return failoverVertices -> failoverVertices.stream()
+			.map(FailoverVertex::getExecutionVertexID)
+			.collect(Collectors.toSet());
+	}
+
 	public static Set<Set<FailoverVertex>> computePipelinedRegions(final FailoverTopology topology) {
 		// currently we let a job with co-location constraints fail as one region
 		// putting co-located vertices in the same region with each other can be a future improvement
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
new file mode 100644
index 0000000..e386870
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Does not release intermediate result partitions during job execution. Relies on partitions being
+ * released at the end of the job.
+ */
+public class NotReleasingPartitionReleaseStrategy implements PartitionReleaseStrategy {
+
+	@Override
+	public List<IntermediateResultPartitionID> vertexFinished(final ExecutionVertexID finishedVertex) {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public void vertexUnfinished(final ExecutionVertexID executionVertexID) {
+	}
+
+	/**
+	 * Factory for {@link NotReleasingPartitionReleaseStrategy}.
+	 */
+	public static class Factory implements PartitionReleaseStrategy.Factory {
+
+		@Override
+		public PartitionReleaseStrategy createInstance(final SchedulingTopology schedulingStrategy, final FailoverTopology failoverTopology) {
+			return new NotReleasingPartitionReleaseStrategy();
+		}
+	}
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
new file mode 100644
index 0000000..d7b317e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.List;
+
+/**
+ * Interface for strategies that decide when to release
+ * {@link IntermediateResultPartition IntermediateResultPartitions}.
+ */
+public interface PartitionReleaseStrategy {
+
+	/**
+	 * Calling this method informs the strategy that a vertex finished.
+	 *
+	 * @param finishedVertex Id of the vertex that finished the execution
+	 * @return A list of result partitions that can be released
+	 */
+	List<IntermediateResultPartitionID> vertexFinished(ExecutionVertexID finishedVertex);
+
+	/**
+	 * Calling this method informs the strategy that a vertex is no longer in finished state, e.g.,
+	 * when a vertex is re-executed.
+	 *
+	 * @param executionVertexID Id of the vertex that is no longer in finished state.
+	 */
+	void vertexUnfinished(ExecutionVertexID executionVertexID);
+
+	/**
+	 * Factory for {@link PartitionReleaseStrategy}.
+	 */
+	interface Factory {
+		PartitionReleaseStrategy createInstance(SchedulingTopology schedulingStrategy, FailoverTopology failoverTopology);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java
new file mode 100644
index 0000000..744a270
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+
+/**
+ * Instantiates a {@link RegionPartitionReleaseStrategy}.
+ */
+public final class PartitionReleaseStrategyFactoryLoader {
+
+	public static PartitionReleaseStrategy.Factory loadPartitionReleaseStrategyFactory(final Configuration configuration) {
+		final boolean partitionReleaseDuringJobExecution = configuration.getBoolean(JobManagerOptions.PARTITION_RELEASE_DURING_JOB_EXECUTION);
+		if (partitionReleaseDuringJobExecution) {
+			return new RegionPartitionReleaseStrategy.Factory();
+		} else {
+			return new NotReleasingPartitionReleaseStrategy.Factory();
+		}
+	}
+
+	private PartitionReleaseStrategyFactoryLoader() {
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
new file mode 100644
index 0000000..36c042e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Set of execution vertices that are connected through pipelined intermediate result partitions.
+ */
+public class PipelinedRegion implements Iterable<ExecutionVertexID> {
+
+	private final Set<ExecutionVertexID> executionVertexIds;
+
+	private PipelinedRegion(final Set<ExecutionVertexID> executionVertexIds) {
+		this.executionVertexIds = new HashSet<>(checkNotNull(executionVertexIds));
+	}
+
+	public static PipelinedRegion from(final Set<ExecutionVertexID> executionVertexIds) {
+		return new PipelinedRegion(executionVertexIds);
+	}
+
+	public static PipelinedRegion from(final ExecutionVertexID... executionVertexIds) {
+		return new PipelinedRegion(new HashSet<>(Arrays.asList(executionVertexIds)));
+	}
+
+	public Set<ExecutionVertexID> getExecutionVertexIds() {
+		return executionVertexIds;
+	}
+
+	public boolean contains(final ExecutionVertexID executionVertexId) {
+		return executionVertexIds.contains(executionVertexId);
+	}
+
+	@Override
+	public Iterator<ExecutionVertexID> iterator() {
+		return executionVertexIds.iterator();
+	}
+
+	@Override
+	public String toString() {
+		return "PipelinedRegion{" +
+			"executionVertexIds=" + executionVertexIds +
+			'}';
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java
new file mode 100644
index 0000000..5cbce45
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A set of intermediate result partitions that are incident to one {@link PipelinedRegion}.
+ */
+class PipelinedRegionConsumedBlockingPartitions {
+
+	private final PipelinedRegion pipelinedRegion;
+
+	private final Set<IntermediateResultPartitionID> consumedBlockingPartitions;
+
+	PipelinedRegionConsumedBlockingPartitions(
+			final PipelinedRegion pipelinedRegion,
+			final Set<IntermediateResultPartitionID> consumedBlockingPartitions) {
+		this.pipelinedRegion = checkNotNull(pipelinedRegion);
+		this.consumedBlockingPartitions = checkNotNull(consumedBlockingPartitions);
+	}
+
+	public Set<IntermediateResultPartitionID> getConsumedBlockingPartitions() {
+		return consumedBlockingPartitions;
+	}
+
+	public PipelinedRegion getPipelinedRegion() {
+		return pipelinedRegion;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java
new file mode 100644
index 0000000..c92fa8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a virtual execution state of a {@link PipelinedRegion}.
+ *
+ * <p>A pipelined region can be either finished or unfinished. It is finished iff. all its
+ * executions have reached the finished state.
+ */
+class PipelinedRegionExecutionView {
+
+	private final PipelinedRegion pipelinedRegion;
+
+	private final Set<ExecutionVertexID> unfinishedVertices;
+
+	PipelinedRegionExecutionView(final PipelinedRegion pipelinedRegion) {
+		this.pipelinedRegion = checkNotNull(pipelinedRegion);
+		this.unfinishedVertices = new HashSet<>(pipelinedRegion.getExecutionVertexIds());
+	}
+
+	public boolean isFinished() {
+		return unfinishedVertices.isEmpty();
+	}
+
+	public void vertexFinished(final ExecutionVertexID executionVertexId) {
+		checkArgument(pipelinedRegion.contains(executionVertexId));
+		unfinishedVertices.remove(executionVertexId);
+	}
+
+	public void vertexUnfinished(final ExecutionVertexID executionVertexId) {
+		checkArgument(pipelinedRegion.contains(executionVertexId));
+		unfinishedVertices.add(executionVertexId);
+	}
+
+	public PipelinedRegion getPipelinedRegion() {
+		return pipelinedRegion;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
new file mode 100644
index 0000000..b930e10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Releases blocking intermediate result partitions that are incident to a {@link PipelinedRegion},
+ * as soon as the region's execution vertices are finished.
+ */
+public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy {
+
+	private final SchedulingTopology schedulingTopology;
+
+	private final Map<PipelinedRegion, PipelinedRegionConsumedBlockingPartitions> consumedBlockingPartitionsByRegion = new IdentityHashMap<>();
+
+	private final Map<ExecutionVertexID, PipelinedRegionExecutionView> regionExecutionViewByVertex = new HashMap<>();
+
+	public RegionPartitionReleaseStrategy(
+			final SchedulingTopology schedulingTopology,
+			final Set<PipelinedRegion> pipelinedRegions) {
+
+		this.schedulingTopology = checkNotNull(schedulingTopology);
+
+		checkNotNull(pipelinedRegions);
+		initConsumedBlockingPartitionsByRegion(pipelinedRegions);
+		initRegionExecutionViewByVertex(pipelinedRegions);
+	}
+
+	private void initConsumedBlockingPartitionsByRegion(final Set<PipelinedRegion> pipelinedRegions) {
+		for (PipelinedRegion pipelinedRegion : pipelinedRegions) {
+			final PipelinedRegionConsumedBlockingPartitions consumedPartitions = computeConsumedPartitionsOfVertexRegion(pipelinedRegion);
+			consumedBlockingPartitionsByRegion.put(pipelinedRegion, consumedPartitions);
+		}
+	}
+
+	private void initRegionExecutionViewByVertex(final Set<PipelinedRegion> pipelinedRegions) {
+		for (PipelinedRegion pipelinedRegion : pipelinedRegions) {
+			final PipelinedRegionExecutionView regionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
+			for (ExecutionVertexID executionVertexId : pipelinedRegion) {
+				regionExecutionViewByVertex.put(executionVertexId, regionExecutionView);
+			}
+		}
+	}
+
+	private PipelinedRegionConsumedBlockingPartitions computeConsumedPartitionsOfVertexRegion(final PipelinedRegion pipelinedRegion) {
+		final Set<IntermediateResultPartitionID> resultPartitionsOutsideOfRegion = findResultPartitionsOutsideOfRegion(pipelinedRegion);
+		return new PipelinedRegionConsumedBlockingPartitions(pipelinedRegion, resultPartitionsOutsideOfRegion);
+	}
+
+	private Set<IntermediateResultPartitionID> findResultPartitionsOutsideOfRegion(final PipelinedRegion pipelinedRegion) {
+		final Set<SchedulingResultPartition> allConsumedPartitionsInRegion = pipelinedRegion
+			.getExecutionVertexIds()
+			.stream()
+			.map(schedulingTopology::getVertexOrThrow)
+			.flatMap(schedulingExecutionVertex -> schedulingExecutionVertex.getConsumedResultPartitions().stream())
+			.collect(Collectors.toSet());
+
+		return filterResultPartitionsOutsideOfRegion(allConsumedPartitionsInRegion, pipelinedRegion);
+	}
+
+	private static Set<IntermediateResultPartitionID> filterResultPartitionsOutsideOfRegion(
+			final Collection<SchedulingResultPartition> resultPartitions,
+			final PipelinedRegion pipelinedRegion) {
+
+		final Set<IntermediateResultPartitionID> result = new HashSet<>();
+		for (final SchedulingResultPartition maybeOutsidePartition : resultPartitions) {
+			final SchedulingExecutionVertex producer = maybeOutsidePartition.getProducer();
+			if (!pipelinedRegion.contains(producer.getId())) {
+				result.add(maybeOutsidePartition.getId());
+			}
+		}
+		return result;
+	}
+
+	@Override
+	public List<IntermediateResultPartitionID> vertexFinished(final ExecutionVertexID finishedVertex) {
+		final PipelinedRegionExecutionView regionExecutionView = getPipelinedRegionExecutionViewForVertex(finishedVertex);
+		regionExecutionView.vertexFinished(finishedVertex);
+
+		if (regionExecutionView.isFinished()) {
+			final PipelinedRegion pipelinedRegion = getPipelinedRegionForVertex(finishedVertex);
+			final PipelinedRegionConsumedBlockingPartitions consumedPartitionsOfVertexRegion = getConsumedBlockingPartitionsForRegion(pipelinedRegion);
+			return filterReleasablePartitions(consumedPartitionsOfVertexRegion);
+		}
+		return Collections.emptyList();
+	}
+
+	@Override
+	public void vertexUnfinished(final ExecutionVertexID executionVertexId) {
+		final PipelinedRegionExecutionView regionExecutionView = getPipelinedRegionExecutionViewForVertex(executionVertexId);
+		regionExecutionView.vertexUnfinished(executionVertexId);
+	}
+
+	private PipelinedRegionExecutionView getPipelinedRegionExecutionViewForVertex(final ExecutionVertexID executionVertexId) {
+		final PipelinedRegionExecutionView pipelinedRegionExecutionView = regionExecutionViewByVertex.get(executionVertexId);
+		checkState(pipelinedRegionExecutionView != null,
+			"PipelinedRegionExecutionView not found for execution vertex %s", executionVertexId);
+		return pipelinedRegionExecutionView;
+	}
+
+	private PipelinedRegion getPipelinedRegionForVertex(final ExecutionVertexID executionVertexId) {
+		final PipelinedRegionExecutionView pipelinedRegionExecutionView = getPipelinedRegionExecutionViewForVertex(executionVertexId);
+		return pipelinedRegionExecutionView.getPipelinedRegion();
+	}
+
+	private PipelinedRegionConsumedBlockingPartitions getConsumedBlockingPartitionsForRegion(final PipelinedRegion pipelinedRegion) {
+		final PipelinedRegionConsumedBlockingPartitions pipelinedRegionConsumedBlockingPartitions = consumedBlockingPartitionsByRegion.get(pipelinedRegion);
+		checkState(pipelinedRegionConsumedBlockingPartitions != null,
+			"Consumed partitions not found for pipelined region %s", pipelinedRegion);
+		checkState(pipelinedRegionConsumedBlockingPartitions.getPipelinedRegion() == pipelinedRegion);
+		return pipelinedRegionConsumedBlockingPartitions;
+	}
+
+	private List<IntermediateResultPartitionID> filterReleasablePartitions(final PipelinedRegionConsumedBlockingPartitions consumedPartitionsOfVertexRegion) {
+		return consumedPartitionsOfVertexRegion
+			.getConsumedBlockingPartitions()
+			.stream()
+			.filter(this::areConsumerRegionsFinished)
+			.collect(Collectors.toList());
+	}
+
+	private boolean areConsumerRegionsFinished(final IntermediateResultPartitionID resultPartitionId) {
+		final SchedulingResultPartition resultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
+		final Collection<SchedulingExecutionVertex> consumers = resultPartition.getConsumers();
+		return consumers
+			.stream()
+			.map(SchedulingExecutionVertex::getId)
+			.allMatch(this::isRegionOfVertexFinished);
+	}
+
+	private boolean isRegionOfVertexFinished(final ExecutionVertexID executionVertexId) {
+		final PipelinedRegionExecutionView regionExecutionView = getPipelinedRegionExecutionViewForVertex(executionVertexId);
+		return regionExecutionView.isFinished();
+	}
+
+	/**
+	 * Factory for {@link PartitionReleaseStrategy}.
+	 */
+	public static class Factory implements PartitionReleaseStrategy.Factory {
+
+		@Override
+		public PartitionReleaseStrategy createInstance(
+				final SchedulingTopology schedulingStrategy,
+				final FailoverTopology failoverTopology) {
+
+			final Set<Set<FailoverVertex>> distinctRegions = PipelinedRegionComputeUtil.computePipelinedRegions(failoverTopology);
+			return new RegionPartitionReleaseStrategy(
+				schedulingStrategy,
+				PipelinedRegionComputeUtil.toPipelinedRegionsSet(distinctRegions));
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
new file mode 100644
index 0000000..0110642
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+/**
+ * Tests for the interactions of the {@link ExecutionGraph} and {@link PartitionReleaseStrategy}.
+ */
+public class ExecutionGraphPartitionReleaseTest extends TestLogger {
+
+	private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+	private static final TestingComponentMainThreadExecutor mainThreadExecutor =
+		new TestingComponentMainThreadExecutor(
+			TestingComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));
+
+	@Test
+	public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception {
+		// setup a simple pipeline of 3 operators with blocking partitions
+		final JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+		final JobVertex operatorVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+		final JobVertex sinkVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+
+		operatorVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+		sinkVertex.connectNewDataSetAsInput(operatorVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+		// setup partition tracker to intercept partition release calls
+		final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+		final Queue<ResultPartitionID> releasedPartitions = new ArrayDeque<>();
+		partitionTracker.setStopTrackingAndReleasePartitionsConsumer(
+			partitionIds -> releasedPartitions.add(partitionIds.iterator().next()));
+
+		final ExecutionGraph executionGraph = createExecutionGraph(partitionTracker, sourceVertex, operatorVertex, sinkVertex);
+
+		// finish vertices one after another, and verify that the appropriate partitions are released
+		mainThreadExecutor.execute(() -> {
+			final Execution sourceExecution = getCurrentExecution(sourceVertex, executionGraph);
+			executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), ExecutionState.FINISHED));
+			assertThat(releasedPartitions, empty());
+		});
+
+		mainThreadExecutor.execute(() -> {
+			final Execution sourceExecution = getCurrentExecution(sourceVertex, executionGraph);
+			final Execution operatorExecution = getCurrentExecution(operatorVertex, executionGraph);
+			executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operatorExecution.getAttemptId(), ExecutionState.FINISHED));
+			assertThat(releasedPartitions, hasSize(1));
+			assertThat(releasedPartitions.remove(), equalTo(new ResultPartitionID(
+				sourceExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
+				sourceExecution.getAttemptId())));
+		});
+
+		mainThreadExecutor.execute(() -> {
+			final Execution operatorExecution = getCurrentExecution(operatorVertex, executionGraph);
+			final Execution sinkExecution = getCurrentExecution(sinkVertex, executionGraph);
+			executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sinkExecution.getAttemptId(), ExecutionState.FINISHED));
+
+			assertThat(releasedPartitions, hasSize(1));
+			assertThat(releasedPartitions.remove(), equalTo(new ResultPartitionID(
+				operatorExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
+				operatorExecution.getAttemptId())));
+		});
+	}
+
+	@Test
+	public void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
+		// setup a pipeline of 2 failover regions (f1 -> f2), where
+		// f1 is just a source
+		// f2 consists of 3 operators (o1,o2,o3), where o1 consumes f1, and o2/o3 consume o1
+		final JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex("source", 1);
+		final JobVertex operator1Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator1", 1);
+		final JobVertex operator2Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator2", 1);
+		final JobVertex operator3Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator3", 1);
+
+		operator1Vertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+		operator2Vertex.connectNewDataSetAsInput(operator1Vertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		operator3Vertex.connectNewDataSetAsInput(operator1Vertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		// setup partition tracker to intercept partition release calls
+		final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+		final Queue<ResultPartitionID> releasedPartitions = new ArrayDeque<>();
+		partitionTracker.setStopTrackingAndReleasePartitionsConsumer(
+			partitionIds -> releasedPartitions.add(partitionIds.iterator().next()));
+
+		final ExecutionGraph executionGraph = createExecutionGraph(
+			partitionTracker, sourceVertex, operator1Vertex, operator2Vertex, operator3Vertex);
+
+		mainThreadExecutor.execute(() -> {
+			final Execution sourceExecution = getCurrentExecution(sourceVertex, executionGraph);
+			// finish the source; this should not result in any release calls since the consumer o1 was not finished
+			executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), ExecutionState.FINISHED));
+			assertThat(releasedPartitions, empty());
+		});
+
+		mainThreadExecutor.execute(() -> {
+			final Execution operator1Execution = getCurrentExecution(operator1Vertex, executionGraph);
+			// finish o1 and schedule the consumers (o2,o3); this should not result in any release calls since not all operators of the pipelined region are finished
+			for (final IntermediateResultPartitionID partitionId : operator1Execution.getVertex().getProducedPartitions().keySet()) {
+				executionGraph.scheduleOrUpdateConsumers(new ResultPartitionID(partitionId, operator1Execution.getAttemptId()));
+			}
+			executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator1Execution.getAttemptId(), ExecutionState.FINISHED));
+			assertThat(releasedPartitions, empty());
+		});
+
+		mainThreadExecutor.execute(() -> {
+			final Execution operator2Execution = getCurrentExecution(operator2Vertex, executionGraph);
+			// finish o2; this should not result in any release calls since o3 was not finished
+			executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator2Execution.getAttemptId(), ExecutionState.FINISHED));
+			assertThat(releasedPartitions, empty());
+		});
+
+		mainThreadExecutor.execute(() -> {
+			final Execution operator2Execution = getCurrentExecution(operator2Vertex, executionGraph);
+			// reset o2
+			operator2Execution.getVertex().resetForNewExecution(0L, 1L);
+			assertThat(releasedPartitions, empty());
+		});
+
+		mainThreadExecutor.execute(() -> {
+			final Execution operator3Execution = getCurrentExecution(operator3Vertex, executionGraph);
+			// finish o3; this should not result in any release calls since o2 was reset
+			executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator3Execution.getAttemptId(), ExecutionState.FINISHED));
+			assertThat(releasedPartitions, empty());
+		});
+	}
+
+	private static Execution getCurrentExecution(final JobVertex jobVertex, final ExecutionGraph executionGraph) {
+		return executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
+	}
+
+	private ExecutionGraph createExecutionGraph(final PartitionTracker partitionTracker, final JobVertex... vertices) throws Exception {
+		final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(
+			null,
+			new JobGraph(new JobID(), "test job", vertices),
+			new Configuration(),
+			scheduledExecutorService,
+			mainThreadExecutor.getMainThreadExecutor(),
+			new TestingSlotProvider(ignored -> CompletableFuture.completedFuture(new TestingLogicalSlot())),
+			ExecutionGraphPartitionReleaseTest.class.getClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy(),
+			new UnregisteredMetricsGroup(),
+			VoidBlobWriter.getInstance(),
+			AkkaUtils.getDefaultTimeout(),
+			log,
+			NettyShuffleMaster.INSTANCE,
+			partitionTracker);
+
+		executionGraph.start(mainThreadExecutor.getMainThreadExecutor());
+		mainThreadExecutor.execute(executionGraph::scheduleForExecution);
+
+		return executionGraph;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
new file mode 100644
index 0000000..1b9ca7a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.RegionPartitionReleaseStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link RegionPartitionReleaseStrategy}.
+ */
+public class RegionPartitionReleaseStrategyTest extends TestLogger {
+
+	private TestingSchedulingTopology testingSchedulingTopology;
+
+	@Before
+	public void setUp() throws Exception {
+		testingSchedulingTopology = new TestingSchedulingTopology();
+	}
+
+	@Test
+	public void releasePartitionsIfDownstreamRegionIsFinished() {
+		final List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().finish();
+		final List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().finish();
+		final List<TestingSchedulingResultPartition> resultPartitions = testingSchedulingTopology.connectPointwise(producers, consumers).finish();
+
+		final ExecutionVertexID onlyProducerVertexId = producers.get(0).getId();
+		final ExecutionVertexID onlyConsumerVertexId = consumers.get(0).getId();
+		final IntermediateResultPartitionID onlyResultPartitionId = resultPartitions.get(0).getId();
+
+		final Set<PipelinedRegion> pipelinedRegions = pipelinedRegionsSet(
+			PipelinedRegion.from(onlyProducerVertexId),
+			PipelinedRegion.from(onlyConsumerVertexId));
+
+		final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+		final List<IntermediateResultPartitionID> partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(onlyConsumerVertexId);
+		assertThat(partitionsToRelease, contains(onlyResultPartitionId));
+	}
+
+	@Test
+	public void releasePartitionsIfDownstreamRegionWithMultipleOperatorsIsFinished() {
+		final List<TestingSchedulingExecutionVertex> sourceVertices = testingSchedulingTopology.addExecutionVertices().finish();
+		final List<TestingSchedulingExecutionVertex> intermediateVertices = testingSchedulingTopology.addExecutionVertices().finish();
+		final List<TestingSchedulingExecutionVertex> sinkVertices = testingSchedulingTopology.addExecutionVertices().finish();
+		final List<TestingSchedulingResultPartition> sourceResultPartitions = testingSchedulingTopology.connectAllToAll(sourceVertices, intermediateVertices).finish();
+		testingSchedulingTopology.connectAllToAll(intermediateVertices, sinkVertices).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
+
+		final ExecutionVertexID onlySourceVertexId = sourceVertices.get(0).getId();
+		final ExecutionVertexID onlyIntermediateVertexId = intermediateVertices.get(0).getId();
+		final ExecutionVertexID onlySinkVertexId = sinkVertices.get(0).getId();
+		final IntermediateResultPartitionID onlySourceResultPartitionId = sourceResultPartitions.get(0).getId();
+
+		final Set<PipelinedRegion> pipelinedRegions = pipelinedRegionsSet(
+			PipelinedRegion.from(onlySourceVertexId),
+			PipelinedRegion.from(onlyIntermediateVertexId, onlySinkVertexId));
+
+		final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+		regionPartitionReleaseStrategy.vertexFinished(onlyIntermediateVertexId);
+		final List<IntermediateResultPartitionID> partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(onlySinkVertexId);
+		assertThat(partitionsToRelease, contains(onlySourceResultPartitionId));
+	}
+
+	@Test
+	public void notReleasePartitionsIfDownstreamRegionIsNotFinished() {
+		final List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().finish();
+		final List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
+		testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
+
+		final ExecutionVertexID onlyProducerVertexId = producers.get(0).getId();
+		final ExecutionVertexID consumerVertex1 = consumers.get(0).getId();
+		final ExecutionVertexID consumerVertex2 = consumers.get(1).getId();
+
+		final Set<PipelinedRegion> pipelinedRegions = pipelinedRegionsSet(
+			PipelinedRegion.from(onlyProducerVertexId),
+			PipelinedRegion.from(consumerVertex1, consumerVertex2));
+
+		final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+		final List<IntermediateResultPartitionID> partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+		assertThat(partitionsToRelease, is(empty()));
+	}
+
+	@Test
+	public void toggleVertexFinishedUnfinished() {
+		final List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().finish();
+		final List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
+		testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
+
+		final ExecutionVertexID onlyProducerVertexId = producers.get(0).getId();
+		final ExecutionVertexID consumerVertex1 = consumers.get(0).getId();
+		final ExecutionVertexID consumerVertex2 = consumers.get(1).getId();
+
+		final Set<PipelinedRegion> pipelinedRegions = pipelinedRegionsSet(
+			PipelinedRegion.from(onlyProducerVertexId),
+			PipelinedRegion.from(consumerVertex1, consumerVertex2));
+
+		final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+		regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+		regionPartitionReleaseStrategy.vertexFinished(consumerVertex2);
+		regionPartitionReleaseStrategy.vertexUnfinished(consumerVertex2);
+
+		final List<IntermediateResultPartitionID> partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+		assertThat(partitionsToRelease, is(empty()));
+	}
+
+	private static Set<PipelinedRegion> pipelinedRegionsSet(final PipelinedRegion... pipelinedRegions) {
+		return new HashSet<>(Arrays.asList(pipelinedRegions));
+	}
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java
new file mode 100644
index 0000000..c108938
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PartitionReleaseStrategyFactoryLoader}.
+ */
+public class PartitionReleaseStrategyFactoryLoaderTest {
+
+	@Test
+	public void featureEnabledByDefault() {
+		final Configuration emptyConfiguration = new Configuration();
+		final PartitionReleaseStrategy.Factory factory =
+			PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(emptyConfiguration);
+
+		assertThat(factory, is(instanceOf(RegionPartitionReleaseStrategy.Factory.class)));
+	}
+
+	@Test
+	public void featureCanBeDisabled() {
+		final Configuration emptyConfiguration = new Configuration();
+		emptyConfiguration.setBoolean(JobManagerOptions.PARTITION_RELEASE_DURING_JOB_EXECUTION, false);
+
+		final PartitionReleaseStrategy.Factory factory =
+			PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(emptyConfiguration);
+
+		assertThat(factory, is(instanceOf(NotReleasingPartitionReleaseStrategy.Factory.class)));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java
new file mode 100644
index 0000000..a6960ef
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link PipelinedRegionExecutionView}.
+ */
+public class PipelinedRegionExecutionViewTest extends TestLogger {
+
+	private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID = new ExecutionVertexID(new JobVertexID(), 0);
+
+	@Test
+	public void regionIsUnfinishedIfNotAllVerticesAreFinished() {
+		final PipelinedRegion pipelinedRegion = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+		final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
+
+		assertFalse(pipelinedRegionExecutionView.isFinished());
+	}
+
+	@Test
+	public void regionIsFinishedIfAllVerticesAreFinished() {
+		final PipelinedRegion pipelinedRegion = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+		final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
+
+		pipelinedRegionExecutionView.vertexFinished(TEST_EXECUTION_VERTEX_ID);
+
+		assertTrue(pipelinedRegionExecutionView.isFinished());
+	}
+
+	@Test
+	public void vertexCanBeUnfinished() {
+		final PipelinedRegion pipelinedRegion = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+		final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
+
+		pipelinedRegionExecutionView.vertexFinished(TEST_EXECUTION_VERTEX_ID);
+		pipelinedRegionExecutionView.vertexUnfinished(TEST_EXECUTION_VERTEX_ID);
+
+		assertFalse(pipelinedRegionExecutionView.isFinished());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void finishingUnknownVertexThrowsException() {
+		final PipelinedRegion from = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+		final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(from);
+
+		final ExecutionVertexID unknownVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+		pipelinedRegionExecutionView.vertexFinished(unknownVertexId);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index 1aaac58..e0ea6c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -79,18 +79,18 @@ public class TestingSchedulingTopology implements SchedulingTopology {
 		}
 	}
 
-	SchedulingExecutionVerticesBuilder addExecutionVertices() {
+	public SchedulingExecutionVerticesBuilder addExecutionVertices() {
 		return new SchedulingExecutionVerticesBuilder();
 	}
 
-	ProducerConsumerConnectionBuilder connectPointwise(
+	public ProducerConsumerConnectionBuilder connectPointwise(
 		final List<TestingSchedulingExecutionVertex> producers,
 		final List<TestingSchedulingExecutionVertex> consumers) {
 
 		return new ProducerConsumerPointwiseConnectionBuilder(producers, consumers);
 	}
 
-	ProducerConsumerConnectionBuilder connectAllToAll(
+	public ProducerConsumerConnectionBuilder connectAllToAll(
 		final List<TestingSchedulingExecutionVertex> producers,
 		final List<TestingSchedulingExecutionVertex> consumers) {
 
@@ -117,12 +117,12 @@ public class TestingSchedulingTopology implements SchedulingTopology {
 			this.consumers = consumers;
 		}
 
-		ProducerConsumerConnectionBuilder withResultPartitionType(final ResultPartitionType resultPartitionType) {
+		public ProducerConsumerConnectionBuilder withResultPartitionType(final ResultPartitionType resultPartitionType) {
 			this.resultPartitionType = resultPartitionType;
 			return this;
 		}
 
-		ProducerConsumerConnectionBuilder withResultPartitionState(final SchedulingResultPartition.ResultPartitionState state) {
+		public ProducerConsumerConnectionBuilder withResultPartitionState(final SchedulingResultPartition.ResultPartitionState state) {
 			this.resultPartitionState = state;
 			return this;
 		}
@@ -229,12 +229,12 @@ public class TestingSchedulingTopology implements SchedulingTopology {
 
 		private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
 
-		SchedulingExecutionVerticesBuilder withParallelism(final int parallelism) {
+		public SchedulingExecutionVerticesBuilder withParallelism(final int parallelism) {
 			this.parallelism = parallelism;
 			return this;
 		}
 
-		SchedulingExecutionVerticesBuilder withInputDependencyConstraint(final InputDependencyConstraint inputDependencyConstraint) {
+		public SchedulingExecutionVerticesBuilder withInputDependencyConstraint(final InputDependencyConstraint inputDependencyConstraint) {
 			this.inputDependencyConstraint = inputDependencyConstraint;
 			return this;
 		}