You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/02 15:08:19 UTC
[flink] 05/05: [FLINK-12883][runtime] Introduce
PartitionReleaseStrategy
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c9aa9a170ff657198c4710706bc3802de42063ca
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed Jun 19 15:06:51 2019 +0200
[FLINK-12883][runtime] Introduce PartitionReleaseStrategy
- Introduce interface PartitionReleaseStrategy.
- Introduce RegionPartitionReleaseStrategy and
NotReleasingPartitionReleaseStrategy implementations, which can be configured
via a new config option.
- Add unit tests for new classes.
- Increase visibility of methods in TestingSchedulingTopology for unit tests
outside of its package.
---
.../flink/configuration/JobManagerOptions.java | 9 +
.../runtime/executiongraph/ExecutionGraph.java | 132 +++++++++++---
.../executiongraph/ExecutionGraphBuilder.java | 6 +
.../runtime/executiongraph/ExecutionVertex.java | 1 +
.../failover/flip1/PipelinedRegionComputeUtil.java | 20 +-
.../NotReleasingPartitionReleaseStrategy.java | 56 ++++++
.../partitionrelease/PartitionReleaseStrategy.java | 58 ++++++
.../PartitionReleaseStrategyFactoryLoader.java | 41 +++++
.../flip1/partitionrelease/PipelinedRegion.java | 69 +++++++
.../PipelinedRegionConsumedBlockingPartitions.java | 51 ++++++
.../PipelinedRegionExecutionView.java | 64 +++++++
.../RegionPartitionReleaseStrategy.java | 190 +++++++++++++++++++
.../ExecutionGraphPartitionReleaseTest.java | 202 +++++++++++++++++++++
.../RegionPartitionReleaseStrategyTest.java | 149 +++++++++++++++
.../PartitionReleaseStrategyFactoryLoaderTest.java | 55 ++++++
.../PipelinedRegionExecutionViewTest.java | 75 ++++++++
.../strategy/TestingSchedulingTopology.java | 14 +-
17 files changed, 1154 insertions(+), 38 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 89515fd..0f9e18d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -173,6 +173,15 @@ public class JobManagerOptions {
text("'legacy': legacy scheduler"),
text("'ng': new generation scheduler"))
.build());
+ /**
+ * Config parameter controlling whether partitions should already be released during the job execution.
+ */
+ @Documentation.ExcludeFromDocumentation("User normally should not be expected to deactivate this feature. " +
+ "We aim at removing this flag eventually.")
+ public static final ConfigOption<Boolean> PARTITION_RELEASE_DURING_JOB_EXECUTION =
+ key("jobmanager.partition.release-during-job-execution")
+ .defaultValue(true)
+ .withDescription("Controls whether partitions should already be released during the job execution.");
@Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 514121e..ce65b68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -48,6 +48,9 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -55,6 +58,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -66,6 +70,11 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -250,6 +259,12 @@ public class ExecutionGraph implements AccessExecutionGraph {
/** The total number of vertices currently in the execution graph. */
private int numVerticesTotal;
+ private final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory;
+
+ private PartitionReleaseStrategy partitionReleaseStrategy;
+
+ private SchedulingTopology schedulingTopology;
+
// ------ Configuration of the Execution -------
/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -413,6 +428,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
userClassLoader,
blobWriter,
allocationTimeout,
+ new NotReleasingPartitionReleaseStrategy.Factory(),
NettyShuffleMaster.INSTANCE,
true,
new PartitionTrackerImpl(
@@ -433,6 +449,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
ClassLoader userClassLoader,
BlobWriter blobWriter,
Time allocationTimeout,
+ PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory,
ShuffleMaster<?> shuffleMaster,
boolean forcePartitionReleaseOnConsumption,
PartitionTracker partitionTracker) throws IOException {
@@ -464,6 +481,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
this.rpcTimeout = checkNotNull(rpcTimeout);
this.allocationTimeout = checkNotNull(allocationTimeout);
+ this.partitionReleaseStrategyFactory = checkNotNull(partitionReleaseStrategyFactory);
+
this.restartStrategy = restartStrategy;
this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());
@@ -913,6 +932,11 @@ public class ExecutionGraph implements AccessExecutionGraph {
}
failoverStrategy.notifyNewVertices(newExecJobVertices);
+
+ schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(this);
+ partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(
+ schedulingTopology,
+ new DefaultFailoverTopology(this));
}
public void scheduleForExecution() throws JobException {
@@ -1605,36 +1629,9 @@ public class ExecutionGraph implements AccessExecutionGraph {
if (attempt != null) {
try {
- Map<String, Accumulator<?, ?>> accumulators;
-
- switch (state.getExecutionState()) {
- case RUNNING:
- return attempt.switchToRunning();
-
- case FINISHED:
- // this deserialization is exception-free
- accumulators = deserializeAccumulators(state);
- attempt.markFinished(accumulators, state.getIOMetrics());
- return true;
-
- case CANCELED:
- // this deserialization is exception-free
- accumulators = deserializeAccumulators(state);
- attempt.completeCancelling(accumulators, state.getIOMetrics());
- return true;
-
- case FAILED:
- // this deserialization is exception-free
- accumulators = deserializeAccumulators(state);
- attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
- return true;
-
- default:
- // we mark as failed and return false, which triggers the TaskManager
- // to remove the task
- attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
- return false;
- }
+ final boolean stateUpdated = updateStateInternal(state, attempt);
+ maybeReleasePartitions(attempt);
+ return stateUpdated;
}
catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
@@ -1649,6 +1646,77 @@ public class ExecutionGraph implements AccessExecutionGraph {
}
}
+ private boolean updateStateInternal(final TaskExecutionState state, final Execution attempt) {
+ Map<String, Accumulator<?, ?>> accumulators;
+
+ switch (state.getExecutionState()) {
+ case RUNNING:
+ return attempt.switchToRunning();
+
+ case FINISHED:
+ // this deserialization is exception-free
+ accumulators = deserializeAccumulators(state);
+ attempt.markFinished(accumulators, state.getIOMetrics());
+ return true;
+
+ case CANCELED:
+ // this deserialization is exception-free
+ accumulators = deserializeAccumulators(state);
+ attempt.completeCancelling(accumulators, state.getIOMetrics());
+ return true;
+
+ case FAILED:
+ // this deserialization is exception-free
+ accumulators = deserializeAccumulators(state);
+ attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
+ return true;
+
+ default:
+ // we mark as failed and return false, which triggers the TaskManager
+ // to remove the task
+ attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
+ return false;
+ }
+ }
+
+ private void maybeReleasePartitions(final Execution attempt) {
+ final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID();
+
+ if (attempt.getState() == ExecutionState.FINISHED) {
+ final List<IntermediateResultPartitionID> releasablePartitions = partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
+ releasePartitions(releasablePartitions);
+ } else {
+ partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
+ }
+ }
+
+ private void releasePartitions(final List<IntermediateResultPartitionID> releasablePartitions) {
+ if (releasablePartitions.size() > 0) {
+ final List<ResultPartitionID> partitionIds = releasablePartitions.stream()
+ .map(this::createResultPartitionId)
+ .collect(Collectors.toList());
+
+ partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
+ }
+ }
+
+ private ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) {
+ final SchedulingResultPartition schedulingResultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
+ final SchedulingExecutionVertex producer = schedulingResultPartition.getProducer();
+ final ExecutionVertexID producerId = producer.getId();
+ final JobVertexID jobVertexId = producerId.getJobVertexId();
+ final ExecutionJobVertex jobVertex = getJobVertex(jobVertexId);
+ checkNotNull(jobVertex, "Unknown job vertex %s", jobVertexId);
+
+ final ExecutionVertex[] taskVertices = jobVertex.getTaskVertices();
+ final int subtaskIndex = producerId.getSubtaskIndex();
+ checkState(subtaskIndex < taskVertices.length, "Invalid subtask index %d for job vertex %s", subtaskIndex, jobVertexId);
+
+ final ExecutionVertex taskVertex = taskVertices[subtaskIndex];
+ final Execution execution = taskVertex.getCurrentExecutionAttempt();
+ return new ResultPartitionID(resultPartitionId, execution.getAttemptId());
+ }
+
/**
* Deserializes accumulators from a task state update.
*
@@ -1835,4 +1903,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
public PartitionTracker getPartitionTracker() {
return partitionTracker;
}
+
+ PartitionReleaseStrategy getPartitionReleaseStrategy() {
+ return partitionReleaseStrategy;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f21d703..1b9d7a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
@@ -117,6 +119,9 @@ public class ExecutionGraphBuilder {
final int maxPriorAttemptsHistoryLength =
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
+ final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
+ PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);
+
final boolean forcePartitionReleaseOnConsumption =
jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
@@ -136,6 +141,7 @@ public class ExecutionGraphBuilder {
classLoader,
blobWriter,
allocationTimeout,
+ partitionReleaseStrategyFactory,
shuffleMaster,
forcePartitionReleaseOnConsumption,
partitionTracker);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 5fc0b72..349aaff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -601,6 +601,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
if (oldState.isTerminal()) {
if (oldState == FINISHED) {
oldExecution.stopTrackingAndReleasePartitions();
+ getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
}
priorExecutions.add(oldExecution.archive());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index 14d28b7..8f19ed9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -19,6 +19,9 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,14 +30,29 @@ import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
- * Utility for computing pipeliend regions.
+ * Utility for computing pipelined regions.
*/
public final class PipelinedRegionComputeUtil {
private static final Logger LOG = LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);
+ public static Set<PipelinedRegion> toPipelinedRegionsSet(final Set<Set<FailoverVertex>> distinctRegions) {
+ return distinctRegions.stream()
+ .map(toExecutionVertexIdSet())
+ .map(PipelinedRegion::from)
+ .collect(Collectors.toSet());
+ }
+
+ private static Function<Set<FailoverVertex>, Set<ExecutionVertexID>> toExecutionVertexIdSet() {
+ return failoverVertices -> failoverVertices.stream()
+ .map(FailoverVertex::getExecutionVertexID)
+ .collect(Collectors.toSet());
+ }
+
public static Set<Set<FailoverVertex>> computePipelinedRegions(final FailoverTopology topology) {
// currently we let a job with co-location constraints fail as one region
// putting co-located vertices in the same region with each other can be a future improvement
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
new file mode 100644
index 0000000..e386870
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Does not release intermediate result partitions during job execution. Relies on partitions being
+ * released at the end of the job.
+ */
+public class NotReleasingPartitionReleaseStrategy implements PartitionReleaseStrategy {
+
+ @Override
+ public List<IntermediateResultPartitionID> vertexFinished(final ExecutionVertexID finishedVertex) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void vertexUnfinished(final ExecutionVertexID executionVertexID) {
+ }
+
+ /**
+ * Factory for {@link NotReleasingPartitionReleaseStrategy}.
+ */
+ public static class Factory implements PartitionReleaseStrategy.Factory {
+
+ @Override
+ public PartitionReleaseStrategy createInstance(final SchedulingTopology schedulingStrategy, final FailoverTopology failoverTopology) {
+ return new NotReleasingPartitionReleaseStrategy();
+ }
+ }
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
new file mode 100644
index 0000000..d7b317e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.List;
+
+/**
+ * Interface for strategies that decide when to release
+ * {@link IntermediateResultPartition IntermediateResultPartitions}.
+ */
+public interface PartitionReleaseStrategy {
+
+ /**
+ * Calling this method informs the strategy that a vertex finished.
+ *
+ * @param finishedVertex Id of the vertex that finished the execution
+ * @return A list of result partitions that can be released
+ */
+ List<IntermediateResultPartitionID> vertexFinished(ExecutionVertexID finishedVertex);
+
+ /**
+ * Calling this method informs the strategy that a vertex is no longer in finished state, e.g.,
+ * when a vertex is re-executed.
+ *
+ * @param executionVertexID Id of the vertex that is no longer in finished state.
+ */
+ void vertexUnfinished(ExecutionVertexID executionVertexID);
+
+ /**
+ * Factory for {@link PartitionReleaseStrategy}.
+ */
+ interface Factory {
+ PartitionReleaseStrategy createInstance(SchedulingTopology schedulingStrategy, FailoverTopology failoverTopology);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java
new file mode 100644
index 0000000..744a270
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+
+/**
+ * Instantiates a {@link RegionPartitionReleaseStrategy}.
+ */
+public final class PartitionReleaseStrategyFactoryLoader {
+
+ public static PartitionReleaseStrategy.Factory loadPartitionReleaseStrategyFactory(final Configuration configuration) {
+ final boolean partitionReleaseDuringJobExecution = configuration.getBoolean(JobManagerOptions.PARTITION_RELEASE_DURING_JOB_EXECUTION);
+ if (partitionReleaseDuringJobExecution) {
+ return new RegionPartitionReleaseStrategy.Factory();
+ } else {
+ return new NotReleasingPartitionReleaseStrategy.Factory();
+ }
+ }
+
+ private PartitionReleaseStrategyFactoryLoader() {
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
new file mode 100644
index 0000000..36c042e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Set of execution vertices that are connected through pipelined intermediate result partitions.
+ */
+public class PipelinedRegion implements Iterable<ExecutionVertexID> {
+
+ private final Set<ExecutionVertexID> executionVertexIds;
+
+ private PipelinedRegion(final Set<ExecutionVertexID> executionVertexIds) {
+ this.executionVertexIds = new HashSet<>(checkNotNull(executionVertexIds));
+ }
+
+ public static PipelinedRegion from(final Set<ExecutionVertexID> executionVertexIds) {
+ return new PipelinedRegion(executionVertexIds);
+ }
+
+ public static PipelinedRegion from(final ExecutionVertexID... executionVertexIds) {
+ return new PipelinedRegion(new HashSet<>(Arrays.asList(executionVertexIds)));
+ }
+
+ public Set<ExecutionVertexID> getExecutionVertexIds() {
+ return executionVertexIds;
+ }
+
+ public boolean contains(final ExecutionVertexID executionVertexId) {
+ return executionVertexIds.contains(executionVertexId);
+ }
+
+ @Override
+ public Iterator<ExecutionVertexID> iterator() {
+ return executionVertexIds.iterator();
+ }
+
+ @Override
+ public String toString() {
+ return "PipelinedRegion{" +
+ "executionVertexIds=" + executionVertexIds +
+ '}';
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java
new file mode 100644
index 0000000..5cbce45
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A set of intermediate result partitions that are incident to one {@link PipelinedRegion}.
+ */
+class PipelinedRegionConsumedBlockingPartitions {
+
+ private final PipelinedRegion pipelinedRegion;
+
+ private final Set<IntermediateResultPartitionID> consumedBlockingPartitions;
+
+ PipelinedRegionConsumedBlockingPartitions(
+ final PipelinedRegion pipelinedRegion,
+ final Set<IntermediateResultPartitionID> consumedBlockingPartitions) {
+ this.pipelinedRegion = checkNotNull(pipelinedRegion);
+ this.consumedBlockingPartitions = checkNotNull(consumedBlockingPartitions);
+ }
+
+ public Set<IntermediateResultPartitionID> getConsumedBlockingPartitions() {
+ return consumedBlockingPartitions;
+ }
+
+ public PipelinedRegion getPipelinedRegion() {
+ return pipelinedRegion;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java
new file mode 100644
index 0000000..c92fa8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a virtual execution state of a {@link PipelinedRegion}.
+ *
+ * <p>A pipelined region can be either finished or unfinished. It is finished iff. all its
+ * executions have reached the finished state.
+ */
+class PipelinedRegionExecutionView {
+
+ private final PipelinedRegion pipelinedRegion;
+
+ private final Set<ExecutionVertexID> unfinishedVertices;
+
+ PipelinedRegionExecutionView(final PipelinedRegion pipelinedRegion) {
+ this.pipelinedRegion = checkNotNull(pipelinedRegion);
+ this.unfinishedVertices = new HashSet<>(pipelinedRegion.getExecutionVertexIds());
+ }
+
+ public boolean isFinished() {
+ return unfinishedVertices.isEmpty();
+ }
+
+ public void vertexFinished(final ExecutionVertexID executionVertexId) {
+ checkArgument(pipelinedRegion.contains(executionVertexId));
+ unfinishedVertices.remove(executionVertexId);
+ }
+
+ public void vertexUnfinished(final ExecutionVertexID executionVertexId) {
+ checkArgument(pipelinedRegion.contains(executionVertexId));
+ unfinishedVertices.add(executionVertexId);
+ }
+
+ public PipelinedRegion getPipelinedRegion() {
+ return pipelinedRegion;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
new file mode 100644
index 0000000..b930e10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Releases blocking intermediate result partitions that are incident to a {@link PipelinedRegion},
+ * as soon as the region's execution vertices are finished.
+ */
+public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy {
+
+ private final SchedulingTopology schedulingTopology;
+
+ private final Map<PipelinedRegion, PipelinedRegionConsumedBlockingPartitions> consumedBlockingPartitionsByRegion = new IdentityHashMap<>();
+
+ private final Map<ExecutionVertexID, PipelinedRegionExecutionView> regionExecutionViewByVertex = new HashMap<>();
+
+ public RegionPartitionReleaseStrategy(
+ final SchedulingTopology schedulingTopology,
+ final Set<PipelinedRegion> pipelinedRegions) {
+
+ this.schedulingTopology = checkNotNull(schedulingTopology);
+
+ checkNotNull(pipelinedRegions);
+ initConsumedBlockingPartitionsByRegion(pipelinedRegions);
+ initRegionExecutionViewByVertex(pipelinedRegions);
+ }
+
+ private void initConsumedBlockingPartitionsByRegion(final Set<PipelinedRegion> pipelinedRegions) {
+ for (PipelinedRegion pipelinedRegion : pipelinedRegions) {
+ final PipelinedRegionConsumedBlockingPartitions consumedPartitions = computeConsumedPartitionsOfVertexRegion(pipelinedRegion);
+ consumedBlockingPartitionsByRegion.put(pipelinedRegion, consumedPartitions);
+ }
+ }
+
+ private void initRegionExecutionViewByVertex(final Set<PipelinedRegion> pipelinedRegions) {
+ for (PipelinedRegion pipelinedRegion : pipelinedRegions) {
+ final PipelinedRegionExecutionView regionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
+ for (ExecutionVertexID executionVertexId : pipelinedRegion) {
+ regionExecutionViewByVertex.put(executionVertexId, regionExecutionView);
+ }
+ }
+ }
+
+ private PipelinedRegionConsumedBlockingPartitions computeConsumedPartitionsOfVertexRegion(final PipelinedRegion pipelinedRegion) {
+ final Set<IntermediateResultPartitionID> resultPartitionsOutsideOfRegion = findResultPartitionsOutsideOfRegion(pipelinedRegion);
+ return new PipelinedRegionConsumedBlockingPartitions(pipelinedRegion, resultPartitionsOutsideOfRegion);
+ }
+
+ private Set<IntermediateResultPartitionID> findResultPartitionsOutsideOfRegion(final PipelinedRegion pipelinedRegion) {
+ final Set<SchedulingResultPartition> allConsumedPartitionsInRegion = pipelinedRegion
+ .getExecutionVertexIds()
+ .stream()
+ .map(schedulingTopology::getVertexOrThrow)
+ .flatMap(schedulingExecutionVertex -> schedulingExecutionVertex.getConsumedResultPartitions().stream())
+ .collect(Collectors.toSet());
+
+ return filterResultPartitionsOutsideOfRegion(allConsumedPartitionsInRegion, pipelinedRegion);
+ }
+
+ private static Set<IntermediateResultPartitionID> filterResultPartitionsOutsideOfRegion(
+ final Collection<SchedulingResultPartition> resultPartitions,
+ final PipelinedRegion pipelinedRegion) {
+
+ final Set<IntermediateResultPartitionID> result = new HashSet<>();
+ for (final SchedulingResultPartition maybeOutsidePartition : resultPartitions) {
+ final SchedulingExecutionVertex producer = maybeOutsidePartition.getProducer();
+ if (!pipelinedRegion.contains(producer.getId())) {
+ result.add(maybeOutsidePartition.getId());
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<IntermediateResultPartitionID> vertexFinished(final ExecutionVertexID finishedVertex) {
+ final PipelinedRegionExecutionView regionExecutionView = getPipelinedRegionExecutionViewForVertex(finishedVertex);
+ regionExecutionView.vertexFinished(finishedVertex);
+
+ if (regionExecutionView.isFinished()) {
+ final PipelinedRegion pipelinedRegion = getPipelinedRegionForVertex(finishedVertex);
+ final PipelinedRegionConsumedBlockingPartitions consumedPartitionsOfVertexRegion = getConsumedBlockingPartitionsForRegion(pipelinedRegion);
+ return filterReleasablePartitions(consumedPartitionsOfVertexRegion);
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void vertexUnfinished(final ExecutionVertexID executionVertexId) {
+ final PipelinedRegionExecutionView regionExecutionView = getPipelinedRegionExecutionViewForVertex(executionVertexId);
+ regionExecutionView.vertexUnfinished(executionVertexId);
+ }
+
+ private PipelinedRegionExecutionView getPipelinedRegionExecutionViewForVertex(final ExecutionVertexID executionVertexId) {
+ final PipelinedRegionExecutionView pipelinedRegionExecutionView = regionExecutionViewByVertex.get(executionVertexId);
+ checkState(pipelinedRegionExecutionView != null,
+ "PipelinedRegionExecutionView not found for execution vertex %s", executionVertexId);
+ return pipelinedRegionExecutionView;
+ }
+
+ private PipelinedRegion getPipelinedRegionForVertex(final ExecutionVertexID executionVertexId) {
+ final PipelinedRegionExecutionView pipelinedRegionExecutionView = getPipelinedRegionExecutionViewForVertex(executionVertexId);
+ return pipelinedRegionExecutionView.getPipelinedRegion();
+ }
+
+ private PipelinedRegionConsumedBlockingPartitions getConsumedBlockingPartitionsForRegion(final PipelinedRegion pipelinedRegion) {
+ final PipelinedRegionConsumedBlockingPartitions pipelinedRegionConsumedBlockingPartitions = consumedBlockingPartitionsByRegion.get(pipelinedRegion);
+ checkState(pipelinedRegionConsumedBlockingPartitions != null,
+ "Consumed partitions not found for pipelined region %s", pipelinedRegion);
+ checkState(pipelinedRegionConsumedBlockingPartitions.getPipelinedRegion() == pipelinedRegion);
+ return pipelinedRegionConsumedBlockingPartitions;
+ }
+
+ private List<IntermediateResultPartitionID> filterReleasablePartitions(final PipelinedRegionConsumedBlockingPartitions consumedPartitionsOfVertexRegion) {
+ return consumedPartitionsOfVertexRegion
+ .getConsumedBlockingPartitions()
+ .stream()
+ .filter(this::areConsumerRegionsFinished)
+ .collect(Collectors.toList());
+ }
+
+ private boolean areConsumerRegionsFinished(final IntermediateResultPartitionID resultPartitionId) {
+ final SchedulingResultPartition resultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
+ final Collection<SchedulingExecutionVertex> consumers = resultPartition.getConsumers();
+ return consumers
+ .stream()
+ .map(SchedulingExecutionVertex::getId)
+ .allMatch(this::isRegionOfVertexFinished);
+ }
+
+ private boolean isRegionOfVertexFinished(final ExecutionVertexID executionVertexId) {
+ final PipelinedRegionExecutionView regionExecutionView = getPipelinedRegionExecutionViewForVertex(executionVertexId);
+ return regionExecutionView.isFinished();
+ }
+
+ /**
+ * Factory for {@link PartitionReleaseStrategy}.
+ */
+ public static class Factory implements PartitionReleaseStrategy.Factory {
+
+ @Override
+ public PartitionReleaseStrategy createInstance(
+ final SchedulingTopology schedulingStrategy,
+ final FailoverTopology failoverTopology) {
+
+ final Set<Set<FailoverVertex>> distinctRegions = PipelinedRegionComputeUtil.computePipelinedRegions(failoverTopology);
+ return new RegionPartitionReleaseStrategy(
+ schedulingStrategy,
+ PipelinedRegionComputeUtil.toPipelinedRegionsSet(distinctRegions));
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
new file mode 100644
index 0000000..0110642
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+/**
+ * Tests for the interactions of the {@link ExecutionGraph} and {@link PartitionReleaseStrategy}.
+ */
+public class ExecutionGraphPartitionReleaseTest extends TestLogger {
+
+ private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ private static final TestingComponentMainThreadExecutor mainThreadExecutor =
+ new TestingComponentMainThreadExecutor(
+ TestingComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));
+
+ @Test
+ public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception {
+ // setup a simple pipeline of 3 operators with blocking partitions
+ final JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+ final JobVertex operatorVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+ final JobVertex sinkVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+
+ operatorVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+ sinkVertex.connectNewDataSetAsInput(operatorVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+ // setup partition tracker to intercept partition release calls
+ final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+ final Queue<ResultPartitionID> releasedPartitions = new ArrayDeque<>();
+ partitionTracker.setStopTrackingAndReleasePartitionsConsumer(
+ partitionIds -> releasedPartitions.add(partitionIds.iterator().next()));
+
+ final ExecutionGraph executionGraph = createExecutionGraph(partitionTracker, sourceVertex, operatorVertex, sinkVertex);
+
+ // finish vertices one after another, and verify that the appropriate partitions are released
+ mainThreadExecutor.execute(() -> {
+ final Execution sourceExecution = getCurrentExecution(sourceVertex, executionGraph);
+ executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), ExecutionState.FINISHED));
+ assertThat(releasedPartitions, empty());
+ });
+
+ mainThreadExecutor.execute(() -> {
+ final Execution sourceExecution = getCurrentExecution(sourceVertex, executionGraph);
+ final Execution operatorExecution = getCurrentExecution(operatorVertex, executionGraph);
+ executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operatorExecution.getAttemptId(), ExecutionState.FINISHED));
+ assertThat(releasedPartitions, hasSize(1));
+ assertThat(releasedPartitions.remove(), equalTo(new ResultPartitionID(
+ sourceExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
+ sourceExecution.getAttemptId())));
+ });
+
+ mainThreadExecutor.execute(() -> {
+ final Execution operatorExecution = getCurrentExecution(operatorVertex, executionGraph);
+ final Execution sinkExecution = getCurrentExecution(sinkVertex, executionGraph);
+ executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sinkExecution.getAttemptId(), ExecutionState.FINISHED));
+
+ assertThat(releasedPartitions, hasSize(1));
+ assertThat(releasedPartitions.remove(), equalTo(new ResultPartitionID(
+ operatorExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
+ operatorExecution.getAttemptId())));
+ });
+ }
+
+ @Test
+ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
+ // setup a pipeline of 2 failover regions (f1 -> f2), where
+ // f1 is just a source
+ // f2 consists of 3 operators (o1,o2,o3), where o1 consumes f1, and o2/o3 consume o1
+ final JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex("source", 1);
+ final JobVertex operator1Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator1", 1);
+ final JobVertex operator2Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator2", 1);
+ final JobVertex operator3Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator3", 1);
+
+ operator1Vertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+ operator2Vertex.connectNewDataSetAsInput(operator1Vertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ operator3Vertex.connectNewDataSetAsInput(operator1Vertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ // setup partition tracker to intercept partition release calls
+ final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+ final Queue<ResultPartitionID> releasedPartitions = new ArrayDeque<>();
+ partitionTracker.setStopTrackingAndReleasePartitionsConsumer(
+ partitionIds -> releasedPartitions.add(partitionIds.iterator().next()));
+
+ final ExecutionGraph executionGraph = createExecutionGraph(
+ partitionTracker, sourceVertex, operator1Vertex, operator2Vertex, operator3Vertex);
+
+ mainThreadExecutor.execute(() -> {
+ final Execution sourceExecution = getCurrentExecution(sourceVertex, executionGraph);
+ // finish the source; this should not result in any release calls since the consumer o1 was not finished
+ executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), ExecutionState.FINISHED));
+ assertThat(releasedPartitions, empty());
+ });
+
+ mainThreadExecutor.execute(() -> {
+ final Execution operator1Execution = getCurrentExecution(operator1Vertex, executionGraph);
+ // finish o1 and schedule the consumers (o2,o3); this should not result in any release calls since not all operators of the pipelined region are finished
+ for (final IntermediateResultPartitionID partitionId : operator1Execution.getVertex().getProducedPartitions().keySet()) {
+ executionGraph.scheduleOrUpdateConsumers(new ResultPartitionID(partitionId, operator1Execution.getAttemptId()));
+ }
+ executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator1Execution.getAttemptId(), ExecutionState.FINISHED));
+ assertThat(releasedPartitions, empty());
+ });
+
+ mainThreadExecutor.execute(() -> {
+ final Execution operator2Execution = getCurrentExecution(operator2Vertex, executionGraph);
+ // finish o2; this should not result in any release calls since o3 was not finished
+ executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator2Execution.getAttemptId(), ExecutionState.FINISHED));
+ assertThat(releasedPartitions, empty());
+ });
+
+ mainThreadExecutor.execute(() -> {
+ final Execution operator2Execution = getCurrentExecution(operator2Vertex, executionGraph);
+ // reset o2
+ operator2Execution.getVertex().resetForNewExecution(0L, 1L);
+ assertThat(releasedPartitions, empty());
+ });
+
+ mainThreadExecutor.execute(() -> {
+ final Execution operator3Execution = getCurrentExecution(operator3Vertex, executionGraph);
+ // finish o3; this should not result in any release calls since o2 was reset
+ executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator3Execution.getAttemptId(), ExecutionState.FINISHED));
+ assertThat(releasedPartitions, empty());
+ });
+ }
+
+ private static Execution getCurrentExecution(final JobVertex jobVertex, final ExecutionGraph executionGraph) {
+ return executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
+ }
+
+ private ExecutionGraph createExecutionGraph(final PartitionTracker partitionTracker, final JobVertex... vertices) throws Exception {
+ final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(
+ null,
+ new JobGraph(new JobID(), "test job", vertices),
+ new Configuration(),
+ scheduledExecutorService,
+ mainThreadExecutor.getMainThreadExecutor(),
+ new TestingSlotProvider(ignored -> CompletableFuture.completedFuture(new TestingLogicalSlot())),
+ ExecutionGraphPartitionReleaseTest.class.getClassLoader(),
+ new StandaloneCheckpointRecoveryFactory(),
+ AkkaUtils.getDefaultTimeout(),
+ new NoRestartStrategy(),
+ new UnregisteredMetricsGroup(),
+ VoidBlobWriter.getInstance(),
+ AkkaUtils.getDefaultTimeout(),
+ log,
+ NettyShuffleMaster.INSTANCE,
+ partitionTracker);
+
+ executionGraph.start(mainThreadExecutor.getMainThreadExecutor());
+ mainThreadExecutor.execute(executionGraph::scheduleForExecution);
+
+ return executionGraph;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
new file mode 100644
index 0000000..1b9ca7a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.RegionPartitionReleaseStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link RegionPartitionReleaseStrategy}.
+ */
+public class RegionPartitionReleaseStrategyTest extends TestLogger {
+
+ private TestingSchedulingTopology testingSchedulingTopology;
+
+ @Before
+ public void setUp() throws Exception {
+ testingSchedulingTopology = new TestingSchedulingTopology();
+ }
+
+ @Test
+ public void releasePartitionsIfDownstreamRegionIsFinished() {
+ final List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().finish();
+ final List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().finish();
+ final List<TestingSchedulingResultPartition> resultPartitions = testingSchedulingTopology.connectPointwise(producers, consumers).finish();
+
+ final ExecutionVertexID onlyProducerVertexId = producers.get(0).getId();
+ final ExecutionVertexID onlyConsumerVertexId = consumers.get(0).getId();
+ final IntermediateResultPartitionID onlyResultPartitionId = resultPartitions.get(0).getId();
+
+ final Set<PipelinedRegion> pipelinedRegions = pipelinedRegionsSet(
+ PipelinedRegion.from(onlyProducerVertexId),
+ PipelinedRegion.from(onlyConsumerVertexId));
+
+ final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+ final List<IntermediateResultPartitionID> partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(onlyConsumerVertexId);
+ assertThat(partitionsToRelease, contains(onlyResultPartitionId));
+ }
+
+ @Test
+ public void releasePartitionsIfDownstreamRegionWithMultipleOperatorsIsFinished() {
+ final List<TestingSchedulingExecutionVertex> sourceVertices = testingSchedulingTopology.addExecutionVertices().finish();
+ final List<TestingSchedulingExecutionVertex> intermediateVertices = testingSchedulingTopology.addExecutionVertices().finish();
+ final List<TestingSchedulingExecutionVertex> sinkVertices = testingSchedulingTopology.addExecutionVertices().finish();
+ final List<TestingSchedulingResultPartition> sourceResultPartitions = testingSchedulingTopology.connectAllToAll(sourceVertices, intermediateVertices).finish();
+ testingSchedulingTopology.connectAllToAll(intermediateVertices, sinkVertices).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
+
+ final ExecutionVertexID onlySourceVertexId = sourceVertices.get(0).getId();
+ final ExecutionVertexID onlyIntermediateVertexId = intermediateVertices.get(0).getId();
+ final ExecutionVertexID onlySinkVertexId = sinkVertices.get(0).getId();
+ final IntermediateResultPartitionID onlySourceResultPartitionId = sourceResultPartitions.get(0).getId();
+
+ final Set<PipelinedRegion> pipelinedRegions = pipelinedRegionsSet(
+ PipelinedRegion.from(onlySourceVertexId),
+ PipelinedRegion.from(onlyIntermediateVertexId, onlySinkVertexId));
+
+ final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+ regionPartitionReleaseStrategy.vertexFinished(onlyIntermediateVertexId);
+ final List<IntermediateResultPartitionID> partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(onlySinkVertexId);
+ assertThat(partitionsToRelease, contains(onlySourceResultPartitionId));
+ }
+
+ @Test
+ public void notReleasePartitionsIfDownstreamRegionIsNotFinished() {
+ final List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().finish();
+ final List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
+ testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
+
+ final ExecutionVertexID onlyProducerVertexId = producers.get(0).getId();
+ final ExecutionVertexID consumerVertex1 = consumers.get(0).getId();
+ final ExecutionVertexID consumerVertex2 = consumers.get(1).getId();
+
+ final Set<PipelinedRegion> pipelinedRegions = pipelinedRegionsSet(
+ PipelinedRegion.from(onlyProducerVertexId),
+ PipelinedRegion.from(consumerVertex1, consumerVertex2));
+
+ final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+ final List<IntermediateResultPartitionID> partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+ assertThat(partitionsToRelease, is(empty()));
+ }
+
+ @Test
+ public void toggleVertexFinishedUnfinished() {
+ final List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().finish();
+ final List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
+ testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
+
+ final ExecutionVertexID onlyProducerVertexId = producers.get(0).getId();
+ final ExecutionVertexID consumerVertex1 = consumers.get(0).getId();
+ final ExecutionVertexID consumerVertex2 = consumers.get(1).getId();
+
+ final Set<PipelinedRegion> pipelinedRegions = pipelinedRegionsSet(
+ PipelinedRegion.from(onlyProducerVertexId),
+ PipelinedRegion.from(consumerVertex1, consumerVertex2));
+
+ final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+ regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+ regionPartitionReleaseStrategy.vertexFinished(consumerVertex2);
+ regionPartitionReleaseStrategy.vertexUnfinished(consumerVertex2);
+
+ final List<IntermediateResultPartitionID> partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+ assertThat(partitionsToRelease, is(empty()));
+ }
+
+ private static Set<PipelinedRegion> pipelinedRegionsSet(final PipelinedRegion... pipelinedRegions) {
+ return new HashSet<>(Arrays.asList(pipelinedRegions));
+ }
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java
new file mode 100644
index 0000000..c108938
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PartitionReleaseStrategyFactoryLoader}.
+ */
+public class PartitionReleaseStrategyFactoryLoaderTest {
+
+ @Test
+ public void featureEnabledByDefault() {
+ final Configuration emptyConfiguration = new Configuration();
+ final PartitionReleaseStrategy.Factory factory =
+ PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(emptyConfiguration);
+
+ assertThat(factory, is(instanceOf(RegionPartitionReleaseStrategy.Factory.class)));
+ }
+
+ @Test
+ public void featureCanBeDisabled() {
+ final Configuration emptyConfiguration = new Configuration();
+ emptyConfiguration.setBoolean(JobManagerOptions.PARTITION_RELEASE_DURING_JOB_EXECUTION, false);
+
+ final PartitionReleaseStrategy.Factory factory =
+ PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(emptyConfiguration);
+
+ assertThat(factory, is(instanceOf(NotReleasingPartitionReleaseStrategy.Factory.class)));
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java
new file mode 100644
index 0000000..a6960ef
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link PipelinedRegionExecutionView}.
+ */
+public class PipelinedRegionExecutionViewTest extends TestLogger {
+
+ private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID = new ExecutionVertexID(new JobVertexID(), 0);
+
+ @Test
+ public void regionIsUnfinishedIfNotAllVerticesAreFinished() {
+ final PipelinedRegion pipelinedRegion = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+ final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
+
+ assertFalse(pipelinedRegionExecutionView.isFinished());
+ }
+
+ @Test
+ public void regionIsFinishedIfAllVerticesAreFinished() {
+ final PipelinedRegion pipelinedRegion = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+ final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
+
+ pipelinedRegionExecutionView.vertexFinished(TEST_EXECUTION_VERTEX_ID);
+
+ assertTrue(pipelinedRegionExecutionView.isFinished());
+ }
+
+ @Test
+ public void vertexCanBeUnfinished() {
+ final PipelinedRegion pipelinedRegion = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+ final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
+
+ pipelinedRegionExecutionView.vertexFinished(TEST_EXECUTION_VERTEX_ID);
+ pipelinedRegionExecutionView.vertexUnfinished(TEST_EXECUTION_VERTEX_ID);
+
+ assertFalse(pipelinedRegionExecutionView.isFinished());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void finishingUnknownVertexThrowsException() {
+ final PipelinedRegion from = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+ final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(from);
+
+ final ExecutionVertexID unknownVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+ pipelinedRegionExecutionView.vertexFinished(unknownVertexId);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index 1aaac58..e0ea6c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -79,18 +79,18 @@ public class TestingSchedulingTopology implements SchedulingTopology {
}
}
- SchedulingExecutionVerticesBuilder addExecutionVertices() {
+ public SchedulingExecutionVerticesBuilder addExecutionVertices() {
return new SchedulingExecutionVerticesBuilder();
}
- ProducerConsumerConnectionBuilder connectPointwise(
+ public ProducerConsumerConnectionBuilder connectPointwise(
final List<TestingSchedulingExecutionVertex> producers,
final List<TestingSchedulingExecutionVertex> consumers) {
return new ProducerConsumerPointwiseConnectionBuilder(producers, consumers);
}
- ProducerConsumerConnectionBuilder connectAllToAll(
+ public ProducerConsumerConnectionBuilder connectAllToAll(
final List<TestingSchedulingExecutionVertex> producers,
final List<TestingSchedulingExecutionVertex> consumers) {
@@ -117,12 +117,12 @@ public class TestingSchedulingTopology implements SchedulingTopology {
this.consumers = consumers;
}
- ProducerConsumerConnectionBuilder withResultPartitionType(final ResultPartitionType resultPartitionType) {
+ public ProducerConsumerConnectionBuilder withResultPartitionType(final ResultPartitionType resultPartitionType) {
this.resultPartitionType = resultPartitionType;
return this;
}
- ProducerConsumerConnectionBuilder withResultPartitionState(final SchedulingResultPartition.ResultPartitionState state) {
+ public ProducerConsumerConnectionBuilder withResultPartitionState(final SchedulingResultPartition.ResultPartitionState state) {
this.resultPartitionState = state;
return this;
}
@@ -229,12 +229,12 @@ public class TestingSchedulingTopology implements SchedulingTopology {
private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
- SchedulingExecutionVerticesBuilder withParallelism(final int parallelism) {
+ public SchedulingExecutionVerticesBuilder withParallelism(final int parallelism) {
this.parallelism = parallelism;
return this;
}
- SchedulingExecutionVerticesBuilder withInputDependencyConstraint(final InputDependencyConstraint inputDependencyConstraint) {
+ public SchedulingExecutionVerticesBuilder withInputDependencyConstraint(final InputDependencyConstraint inputDependencyConstraint) {
this.inputDependencyConstraint = inputDependencyConstraint;
return this;
}