You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/07/03 23:03:36 UTC
[flink] 04/04: [FLINK-12876][runtime] Adapt new
RestartPipelinedRegionStrategy to legacy scheduling
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6a682ff7bfa6f0073759ec716b09bf6e06a3d36b
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed Jul 3 20:20:34 2019 +0200
[FLINK-12876][runtime] Adapt new RestartPipelinedRegionStrategy to legacy scheduling
Implement adapter (AdaptedRestartPipelinedRegionStrategyNG) that adapts
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
to the legacy failover strategy interface
(org.apache.flink.runtime.executiongraph.failover.FailoverStrategy).
The new AdaptedRestartPipelinedRegionStrategyNG is chosen if config option
jobmanager.execution.failover-strategy is set to "region". The legacy behavior
can be enabled by setting the config option to "region-legacy".
This closes #8922.
---
.../runtime/executiongraph/ExecutionGraph.java | 156 +------
.../runtime/executiongraph/SchedulingUtils.java | 218 ++++++++++
.../AdaptedRestartPipelinedRegionStrategyNG.java | 316 ++++++++++++++
.../failover/FailoverStrategyLoader.java | 8 +-
.../runtime/scheduler/ExecutionVertexVersion.java | 45 ++
.../scheduler/ExecutionVertexVersioner.java | 76 ++++
...egionStrategyNGAbortPendingCheckpointsTest.java | 171 ++++++++
...inedRegionStrategyNGConcurrentFailoverTest.java | 284 +++++++++++++
...startPipelinedRegionStrategyNGFailoverTest.java | 459 +++++++++++++++++++++
.../executiongraph/ExecutionGraphRestartTest.java | 108 ++++-
.../PipelinedFailoverRegionBuildingTest.java | 2 +-
...RestartPipelinedRegionStrategyBuildingTest.java | 4 +-
.../scheduler/ExecutionVertexVersionerTest.java | 121 ++++++
13 files changed, 1807 insertions(+), 161 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ce65b68..0840c00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
@@ -65,9 +64,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
@@ -108,16 +104,13 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -952,11 +945,11 @@ public class ExecutionGraph implements AccessExecutionGraph {
switch (scheduleMode) {
case LAZY_FROM_SOURCES:
- newSchedulingFuture = scheduleLazy(slotProvider);
+ newSchedulingFuture = scheduleLazy();
break;
case EAGER:
- newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
+ newSchedulingFuture = scheduleEager();
break;
default:
@@ -985,123 +978,16 @@ public class ExecutionGraph implements AccessExecutionGraph {
}
}
- private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) {
-
- final ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<>(numVerticesTotal);
- // simply take the vertices without inputs.
- for (ExecutionJobVertex ejv : verticesInCreationOrder) {
- if (ejv.getJobVertex().isInputVertex()) {
- final CompletableFuture<Void> schedulingJobVertexFuture = ejv.scheduleAll(
- slotProvider,
- allowQueuedScheduling,
- LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty
- Collections.emptySet());
-
- schedulingFutures.add(schedulingJobVertexFuture);
- }
- }
-
- return FutureUtils.waitForAll(schedulingFutures);
+ private CompletableFuture<Void> scheduleLazy() {
+ return SchedulingUtils.scheduleLazy(getAllExecutionVertices(), this);
}
/**
- *
- *
- * @param slotProvider The resource provider from which the slots are allocated
- * @param timeout The maximum time that the deployment may take, before a
- * TimeoutException is thrown.
* @return Future which is completed once the {@link ExecutionGraph} has been scheduled.
* The future can also be completed exceptionally if an error happened.
*/
- private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
- assertRunningInJobMasterMainThread();
- checkState(state == JobStatus.RUNNING, "job is not running currently");
-
- // Important: reserve all the space we need up front.
- // that way we do not have any operation that can fail between allocating the slots
- // and adding them to the list. If we had a failure in between there, that would
- // cause the slots to get lost
- final boolean queued = allowQueuedScheduling;
-
- // collecting all the slots may resize and fail in that operation without slots getting lost
- final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
-
- final Set<AllocationID> allPreviousAllocationIds =
- Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());
-
- // allocate the slots (obtain all their futures
- for (ExecutionJobVertex ejv : getVerticesTopologically()) {
- // these calls are not blocking, they only return futures
- Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
- slotProvider,
- queued,
- LocationPreferenceConstraint.ALL,
- allPreviousAllocationIds,
- timeout);
-
- allAllocationFutures.addAll(allocationFutures);
- }
-
- // this future is complete once all slot futures are complete.
- // the future fails once one slot future fails.
- final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
-
- return allAllocationsFuture.thenAccept(
- (Collection<Execution> executionsToDeploy) -> {
- for (Execution execution : executionsToDeploy) {
- try {
- execution.deploy();
- } catch (Throwable t) {
- throw new CompletionException(
- new FlinkException(
- String.format("Could not deploy execution %s.", execution),
- t));
- }
- }
- })
- // Generate a more specific failure message for the eager scheduling
- .exceptionally(
- (Throwable throwable) -> {
- final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
- final Throwable resultThrowable;
- if (strippedThrowable instanceof TimeoutException) {
- int numTotal = allAllocationsFuture.getNumFuturesTotal();
- int numComplete = allAllocationsFuture.getNumFuturesCompleted();
-
- String message = "Could not allocate all requires slots within timeout of " +
- timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete +
- ", previous allocation IDs: " + allPreviousAllocationIds;
-
- StringBuilder executionMessageBuilder = new StringBuilder();
-
- for (int i = 0; i < allAllocationFutures.size(); i++) {
- CompletableFuture<Execution> executionFuture = allAllocationFutures.get(i);
-
- try {
- Execution execution = executionFuture.getNow(null);
- if (execution != null) {
- executionMessageBuilder.append("completed: " + execution);
- } else {
- executionMessageBuilder.append("incomplete: " + executionFuture);
- }
- } catch (CompletionException completionException) {
- executionMessageBuilder.append("completed exceptionally: " + completionException + "/" + executionFuture);
- }
-
- if (i < allAllocationFutures.size() - 1) {
- executionMessageBuilder.append(", ");
- }
- }
-
- message += ", execution status: " + executionMessageBuilder.toString();
-
- resultThrowable = new NoResourceAvailableException(message);
- } else {
- resultThrowable = strippedThrowable;
- }
-
- throw new CompletionException(resultThrowable);
- });
+ private CompletableFuture<Void> scheduleEager() {
+ return SchedulingUtils.scheduleEager(getAllExecutionVertices(), this);
}
public void cancel() {
@@ -1414,7 +1300,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
* and is used to disambiguate concurrent modifications between local and global
* failover actions.
*/
- long getGlobalModVersion() {
+ public long getGlobalModVersion() {
return globalModVersion;
}
@@ -1808,34 +1694,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
}
}
- /**
- * Computes and returns a set with the prior allocation ids from all execution vertices in the graph.
- */
- private Set<AllocationID> computeAllPriorAllocationIds() {
- HashSet<AllocationID> allPreviousAllocationIds = new HashSet<>(getNumberOfExecutionJobVertices());
- for (ExecutionVertex executionVertex : getAllExecutionVertices()) {
- AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation();
- if (latestPriorAllocation != null) {
- allPreviousAllocationIds.add(latestPriorAllocation);
- }
- }
- return allPreviousAllocationIds;
- }
-
- /**
- * Returns the result of {@link #computeAllPriorAllocationIds()}, but only if the scheduling really requires it.
- * Otherwise this method simply returns an empty set.
- */
- private Set<AllocationID> computeAllPriorAllocationIdsIfRequiredByScheduling() {
- // This is a temporary optimization to avoid computing all previous allocations if not required
- // This can go away when we progress with the implementation of the Scheduler.
- if (slotProvider instanceof Scheduler && ((Scheduler) slotProvider).requiresPreviousExecutionGraphAllocations()) {
- return computeAllPriorAllocationIds();
- } else {
- return Collections.emptySet();
- }
- }
-
// --------------------------------------------------------------------------------------------
// Listeners & Observers
// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java
new file mode 100644
index 0000000..950fcb1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class contains scheduling logic for EAGER and LAZY_FROM_SOURCES.
+ * It is used for normal scheduling and legacy failover strategy re-scheduling.
+ */
+public class SchedulingUtils {
+
+ /**
+ * Schedule vertices lazy. That means only vertices satisfying its input constraint will be scheduled.
+ *
+ * @param vertices Topologically sorted vertices to schedule.
+ * @param executionGraph The graph the given vertices belong to.
+ */
+ public static CompletableFuture<Void> scheduleLazy(
+ final Iterable<ExecutionVertex> vertices,
+ final ExecutionGraph executionGraph) {
+
+ executionGraph.assertRunningInJobMasterMainThread();
+
+ final Set<AllocationID> previousAllocations = computePriorAllocationIdsIfRequiredByScheduling(
+ vertices, executionGraph.getSlotProvider());
+
+ final ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<>();
+ for (ExecutionVertex executionVertex : vertices) {
+ // only schedule vertex when its input constraint is satisfied
+ if (executionVertex.getJobVertex().getJobVertex().isInputVertex() ||
+ executionVertex.checkInputDependencyConstraints()) {
+
+ final CompletableFuture<Void> schedulingVertexFuture = executionVertex.scheduleForExecution(
+ executionGraph.getSlotProvider(),
+ executionGraph.isQueuedSchedulingAllowed(),
+ LocationPreferenceConstraint.ANY,
+ previousAllocations);
+
+ schedulingFutures.add(schedulingVertexFuture);
+ }
+ }
+
+ return FutureUtils.waitForAll(schedulingFutures);
+ }
+
+ /**
+ * Schedule vertices eagerly. That means all vertices will be scheduled at once.
+ *
+ * @param vertices Topologically sorted vertices to schedule.
+ * @param executionGraph The graph the given vertices belong to.
+ */
+ public static CompletableFuture<Void> scheduleEager(
+ final Iterable<ExecutionVertex> vertices,
+ final ExecutionGraph executionGraph) {
+
+ executionGraph.assertRunningInJobMasterMainThread();
+
+ checkState(executionGraph.getState() == JobStatus.RUNNING, "job is not running currently");
+
+ // Important: reserve all the space we need up front.
+ // that way we do not have any operation that can fail between allocating the slots
+ // and adding them to the list. If we had a failure in between there, that would
+ // cause the slots to get lost
+ final boolean queued = executionGraph.isQueuedSchedulingAllowed();
+
+ // collecting all the slots may resize and fail in that operation without slots getting lost
+ final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>();
+
+ final Set<AllocationID> allPreviousAllocationIds = Collections.unmodifiableSet(
+ computePriorAllocationIdsIfRequiredByScheduling(vertices, executionGraph.getSlotProvider()));
+
+ // allocate the slots (obtain all their futures)
+ for (ExecutionVertex ev : vertices) {
+ // these calls are not blocking, they only return futures
+ CompletableFuture<Execution> allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution(
+ executionGraph.getSlotProvider(),
+ queued,
+ LocationPreferenceConstraint.ALL,
+ allPreviousAllocationIds,
+ executionGraph.getAllocationTimeout());
+
+ allAllocationFutures.add(allocationFuture);
+ }
+
+ // this future is complete once all slot futures are complete.
+ // the future fails once one slot future fails.
+ final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
+
+ return allAllocationsFuture.thenAccept(
+ (Collection<Execution> executionsToDeploy) -> {
+ for (Execution execution : executionsToDeploy) {
+ try {
+ execution.deploy();
+ } catch (Throwable t) {
+ throw new CompletionException(
+ new FlinkException(
+ String.format("Could not deploy execution %s.", execution),
+ t));
+ }
+ }
+ })
+ // Generate a more specific failure message for the eager scheduling
+ .exceptionally(
+ (Throwable throwable) -> {
+ final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+ final Throwable resultThrowable;
+ if (strippedThrowable instanceof TimeoutException) {
+ int numTotal = allAllocationsFuture.getNumFuturesTotal();
+ int numComplete = allAllocationsFuture.getNumFuturesCompleted();
+
+ String message = "Could not allocate all requires slots within timeout of "
+ + executionGraph.getAllocationTimeout() + ". Slots required: "
+ + numTotal + ", slots allocated: " + numComplete
+ + ", previous allocation IDs: " + allPreviousAllocationIds;
+
+ StringBuilder executionMessageBuilder = new StringBuilder();
+
+ for (int i = 0; i < allAllocationFutures.size(); i++) {
+ CompletableFuture<Execution> executionFuture = allAllocationFutures.get(i);
+
+ try {
+ Execution execution = executionFuture.getNow(null);
+ if (execution != null) {
+ executionMessageBuilder.append("completed: " + execution);
+ } else {
+ executionMessageBuilder.append("incomplete: " + executionFuture);
+ }
+ } catch (CompletionException completionException) {
+ executionMessageBuilder.append("completed exceptionally: "
+ + completionException + "/" + executionFuture);
+ }
+
+ if (i < allAllocationFutures.size() - 1) {
+ executionMessageBuilder.append(", ");
+ }
+ }
+
+ message += ", execution status: " + executionMessageBuilder.toString();
+
+ resultThrowable = new NoResourceAvailableException(message);
+ } else {
+ resultThrowable = strippedThrowable;
+ }
+
+ throw new CompletionException(resultThrowable);
+ });
+ }
+
+ /**
+ * Returns the result of {@link #computePriorAllocationIds(Iterable)},
+ * but only if the scheduling really requires it.
+ * Otherwise this method simply returns an empty set.
+ */
+ private static Set<AllocationID> computePriorAllocationIdsIfRequiredByScheduling(
+ final Iterable<ExecutionVertex> vertices,
+ final SlotProvider slotProvider) {
+ // This is a temporary optimization to avoid computing all previous allocations if not required
+ // This can go away when we progress with the implementation of the Scheduler.
+ if (slotProvider instanceof Scheduler &&
+ ((Scheduler) slotProvider).requiresPreviousExecutionGraphAllocations()) {
+
+ return computePriorAllocationIds(vertices);
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ /**
+ * Computes and returns a set with the prior allocation ids for given execution vertices.
+ */
+ private static Set<AllocationID> computePriorAllocationIds(final Iterable<ExecutionVertex> vertices) {
+ HashSet<AllocationID> allPreviousAllocationIds = new HashSet<>();
+ for (ExecutionVertex executionVertex : vertices) {
+ AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation();
+ if (latestPriorAllocation != null) {
+ allPreviousAllocationIds.add(latestPriorAllocation);
+ }
+ }
+ return allPreviousAllocationIds;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
new file mode 100644
index 0000000..5fddac3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.executiongraph.SchedulingUtils;
+import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+ /** The execution graph on which this FailoverStrategy works. */
+ private final ExecutionGraph executionGraph;
+
+ /** The versioner helps to maintain execution vertex versions. */
+ private final ExecutionVertexVersioner executionVertexVersioner;
+
+ /** The underlying new generation region failover strategy. */
+ private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+ public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) {
+ this.executionGraph = checkNotNull(executionGraph);
+ this.executionVertexVersioner = new ExecutionVertexVersioner();
+ }
+
+ @Override
+ public void onTaskFailure(final Execution taskExecution, final Throwable cause) {
+ if (!executionGraph.getRestartStrategy().canRestart()) {
+ // delegate the failure to a global fail that will check the restart strategy and not restart
+ LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global.");
+ failGlobal(cause);
+ return;
+ }
+
+ if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+ LOG.info("Skip current region failover as a global failover is ongoing.");
+ return;
+ }
+
+ final ExecutionVertexID vertexID = getExecutionVertexID(taskExecution.getVertex());
+
+ final Set<ExecutionVertexID> tasksToRestart = restartPipelinedRegionStrategy.getTasksNeedingRestart(vertexID, cause);
+ restartTasks(tasksToRestart);
+ }
+
+ @VisibleForTesting
+ protected void restartTasks(final Set<ExecutionVertexID> verticesToRestart) {
+ final long globalModVersion = executionGraph.getGlobalModVersion();
+ final Set<ExecutionVertexVersion> vertexVersions = new HashSet<>(
+ executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+ FutureUtils.assertNoException(
+ cancelTasks(verticesToRestart)
+ .thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor())
+ .handle(failGlobalOnError()));
+ }
+
+ private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
+ return () -> {
+ if (!isLocalFailoverValid(globalModVersion)) {
+ LOG.info("Skip current region failover as a global failover is ongoing.");
+ return;
+ }
+
+ // found out vertices which are still valid to restart.
+ // some vertices involved in this failover may be modified if another region
+ // failover happens during the cancellation stage of this failover.
+ // Will ignore the modified vertices as the other failover will deal with them.
+ final Set<ExecutionVertex> unmodifiedVertices = executionVertexVersioner
+ .getUnmodifiedExecutionVertices(vertexVersions)
+ .stream()
+ .map(this::getExecutionVertex)
+ .collect(Collectors.toSet());
+
+ try {
+ LOG.info("Finally restart {} tasks to recover from task failure.", unmodifiedVertices.size());
+
+ // reset tasks to CREATED state and reload state
+ resetTasks(unmodifiedVertices, globalModVersion);
+
+ // re-schedule tasks
+ rescheduleTasks(unmodifiedVertices, globalModVersion);
+ } catch (GlobalModVersionMismatch e) {
+ throw new IllegalStateException(
+ "Bug: ExecutionGraph was concurrently modified outside of main thread", e);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ };
+ }
+
+ private BiFunction<Object, Throwable, Object> failGlobalOnError() {
+ return (Object ignored, Throwable t) -> {
+ if (t != null) {
+ LOG.info("Unexpected error happens in region failover. Fail globally.", t);
+ failGlobal(t);
+ }
+ return null;
+ };
+ }
+
+ @VisibleForTesting
+ protected CompletableFuture<?> cancelTasks(final Set<ExecutionVertexID> vertices) {
+ final List<CompletableFuture<?>> cancelFutures = vertices.stream()
+ .map(this::cancelExecutionVertex)
+ .collect(Collectors.toList());
+
+ return FutureUtils.combineAll(cancelFutures);
+ }
+
+ private void resetTasks(final Set<ExecutionVertex> vertices, final long globalModVersion) throws Exception {
+ final Set<CoLocationGroup> colGroups = new HashSet<>();
+ final long restartTimestamp = System.currentTimeMillis();
+
+ for (ExecutionVertex ev : vertices) {
+ CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
+ if (cgroup != null && !colGroups.contains(cgroup)){
+ cgroup.resetConstraints();
+ colGroups.add(cgroup);
+ }
+
+ ev.resetForNewExecution(restartTimestamp, globalModVersion);
+ }
+
+ // if there is checkpointed state, reload it into the executions
+ if (executionGraph.getCheckpointCoordinator() != null) {
+ // abort pending checkpoints to
+ // i) enable new checkpoint triggering without waiting for last checkpoint expired.
+ // ii) ensure the EXACTLY_ONCE semantics if needed.
+ executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
+ new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
+
+ final Map<JobVertexID, ExecutionJobVertex> involvedExecutionJobVertices =
+ getInvolvedExecutionJobVertices(vertices);
+ executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
+ involvedExecutionJobVertices, false, true);
+ }
+ }
+
+ private void rescheduleTasks(final Set<ExecutionVertex> vertices, final long globalModVersion) throws Exception {
+
+ // sort vertices topologically
+ // this is to reduce the possibility that downstream tasks get launched earlier,
+ // which may cause lots of partition state checks in EAGER mode
+ final List<ExecutionVertex> sortedVertices = sortVerticesTopologically(vertices);
+
+ final CompletableFuture<Void> newSchedulingFuture;
+ switch (executionGraph.getScheduleMode()) {
+
+ case LAZY_FROM_SOURCES:
+ newSchedulingFuture = SchedulingUtils.scheduleLazy(sortedVertices, executionGraph);
+ break;
+
+ case EAGER:
+ newSchedulingFuture = SchedulingUtils.scheduleEager(sortedVertices, executionGraph);
+ break;
+
+ default:
+ throw new JobException("Schedule mode is invalid.");
+ }
+
+ // if no global failover is triggered in the scheduling process,
+ // register a failure handling callback to the scheduling
+ if (isLocalFailoverValid(globalModVersion)) {
+ newSchedulingFuture.whenComplete(
+ (Void ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+
+ if (!(strippedThrowable instanceof CancellationException)) {
+ // only fail if the scheduling future was not canceled
+ failGlobal(strippedThrowable);
+ }
+ }
+ });
+ }
+ }
+
+ private boolean isLocalFailoverValid(final long globalModVersion) {
+ // local failover is valid only if the job is still RUNNING and
+ // no global failover happens since the given globalModVersion is recorded
+ return executionGraph.getState() == JobStatus.RUNNING &&
+ executionGraph.getGlobalModVersion() == globalModVersion;
+ }
+
+ private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID executionVertexId) {
+ return getExecutionVertex(executionVertexId).cancel();
+ }
+
+ private Map<JobVertexID, ExecutionJobVertex> getInvolvedExecutionJobVertices(
+ final Set<ExecutionVertex> executionVertices) {
+
+ Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+ for (ExecutionVertex executionVertex : executionVertices) {
+ JobVertexID jobvertexId = executionVertex.getJobvertexId();
+ ExecutionJobVertex jobVertex = executionVertex.getJobVertex();
+ tasks.putIfAbsent(jobvertexId, jobVertex);
+ }
+ return tasks;
+ }
+
+ private void failGlobal(final Throwable cause) {
+ executionGraph.failGlobal(cause);
+ }
+
+ private ExecutionVertex getExecutionVertex(final ExecutionVertexID vertexID) {
+ return executionGraph.getAllVertices()
+ .get(vertexID.getJobVertexId())
+ .getTaskVertices()[vertexID.getSubtaskIndex()];
+ }
+
+ private ExecutionVertexID getExecutionVertexID(final ExecutionVertex vertex) {
+ return new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex());
+ }
+
+ private List<ExecutionVertex> sortVerticesTopologically(final Set<ExecutionVertex> vertices) {
+ // org execution vertex by jobVertexId
+ final Map<JobVertexID, List<ExecutionVertex>> verticesMap = new HashMap<>();
+ for (ExecutionVertex vertex : vertices) {
+ verticesMap.computeIfAbsent(vertex.getJobvertexId(), id -> new ArrayList<>()).add(vertex);
+ }
+
+ // sort in jobVertex topological order
+ final List<ExecutionVertex> sortedVertices = new ArrayList<>(vertices.size());
+ for (ExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) {
+ sortedVertices.addAll(verticesMap.getOrDefault(jobVertex.getJobVertexId(), Collections.emptyList()));
+ }
+ return sortedVertices;
+ }
+
+ @Override
+ public void notifyNewVertices(final List<ExecutionJobVertex> newJobVerticesTopological) {
+ // build the underlying new generation failover strategy when the executionGraph vertices are all added,
+ // otherwise the failover topology will not be correctly built.
+ // currently it's safe to add it here, as this method is invoked only once in production code.
+ checkState(restartPipelinedRegionStrategy == null, "notifyNewVertices() must be called only once");
+ this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
+ new DefaultFailoverTopology(executionGraph));
+ }
+
+ @Override
+ public String getStrategyName() {
+ return "New Pipelined Region Failover";
+ }
+
+ // ------------------------------------------------------------------------
+ // factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Factory that instantiates the AdaptedRestartPipelinedRegionStrategyNG.
+ */
+ public static class Factory implements FailoverStrategy.Factory {
+
+ @Override
+ public FailoverStrategy create(final ExecutionGraph executionGraph) {
+ return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph);
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
index eb003b9..e80c6ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
@@ -38,9 +38,12 @@ public class FailoverStrategyLoader {
/** Config name for the {@link RestartIndividualStrategy}. */
public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual";
- /** Config name for the {@link RestartPipelinedRegionStrategy} */
+ /** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG}. */
public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region";
+ /** Config name for the {@link RestartPipelinedRegionStrategy}. */
+ public static final String LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-legacy";
+
// ------------------------------------------------------------------------
/**
@@ -63,6 +66,9 @@ public class FailoverStrategyLoader {
return new RestartAllStrategy.Factory();
case PIPELINED_REGION_RESTART_STRATEGY_NAME:
+ return new AdaptedRestartPipelinedRegionStrategyNG.Factory();
+
+ case LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME:
return new RestartPipelinedRegionStrategy.Factory();
case INDIVIDUAL_RESTART_STRATEGY_NAME:
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java
new file mode 100644
index 0000000..28ae3cf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+/**
+ * This class helps to record version of an execution vertex.
+ */
+public class ExecutionVertexVersion {
+
+ private final ExecutionVertexID executionVertexId;
+
+ private final long version;
+
+ ExecutionVertexVersion(final ExecutionVertexID executionVertexId, final long version) {
+ this.executionVertexId = executionVertexId;
+ this.version = version;
+ }
+
+ public ExecutionVertexID getExecutionVertexId() {
+ return executionVertexId;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
new file mode 100644
index 0000000..081b6ff
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Records modifications of
+ * {@link org.apache.flink.runtime.executiongraph.ExecutionVertex ExecutionVertices}, and allows
+ * for checking whether a vertex was modified.
+ *
+ * <p>Examples for modifications include:
+ * <ul>
+ * <li>cancellation of the underlying execution
+ * <li>deployment of the execution vertex
+ * </ul>
+ *
+ * @see DefaultScheduler
+ */
+public class ExecutionVertexVersioner {
+
+ private final Map<ExecutionVertexID, Long> executionVertexToVersion = new HashMap<>();
+
+ public ExecutionVertexVersion recordModification(final ExecutionVertexID executionVertexId) {
+ final Long newVersion = executionVertexToVersion.merge(executionVertexId, 1L, Long::sum);
+ return new ExecutionVertexVersion(executionVertexId, newVersion);
+ }
+
+ public Map<ExecutionVertexID, ExecutionVertexVersion> recordVertexModifications(
+ final Collection<ExecutionVertexID> vertices) {
+ return vertices.stream()
+ .map(this::recordModification)
+ .collect(Collectors.toMap(ExecutionVertexVersion::getExecutionVertexId, Function.identity()));
+ }
+
+ public boolean isModified(final ExecutionVertexVersion executionVertexVersion) {
+ final Long currentVersion = executionVertexToVersion.get(executionVertexVersion.getExecutionVertexId());
+ Preconditions.checkState(currentVersion != null,
+ "Execution vertex %s does not have a recorded version",
+ executionVertexVersion.getExecutionVertexId());
+ return currentVersion != executionVertexVersion.getVersion();
+ }
+
+ public Set<ExecutionVertexID> getUnmodifiedExecutionVertices(final Set<ExecutionVertexVersion> executionVertexVersions) {
+ return executionVertexVersions.stream()
+ .filter(executionVertexVersion -> !isModified(executionVertexVersion))
+ .map(ExecutionVertexVersion::getExecutionVertexId)
+ .collect(Collectors.toSet());
+ }
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
new file mode 100644
index 0000000..17271b7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest extends TestLogger {
+
+ private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+ private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+ @Before
+ public void setUp() {
+ manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+ componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread());
+ }
+
+ @Test
+ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
+ final JobGraph jobGraph = createStreamingJobGraph();
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+
+ final Iterator<ExecutionVertex> vertexIterator = executionGraph.getAllExecutionVertices().iterator();
+ final ExecutionVertex onlyExecutionVertex = vertexIterator.next();
+
+ setTaskRunning(executionGraph, onlyExecutionVertex);
+
+ final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+ checkState(checkpointCoordinator != null);
+
+ checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
+ final int pendingCheckpointsBeforeFailure = checkpointCoordinator.getNumberOfPendingCheckpoints();
+
+ failVertex(onlyExecutionVertex);
+
+ assertThat(pendingCheckpointsBeforeFailure, is(equalTo(1)));
+ assertNoPendingCheckpoints(checkpointCoordinator);
+ }
+
+ private void setTaskRunning(final ExecutionGraph executionGraph, final ExecutionVertex executionVertex) {
+ executionGraph.updateState(
+ new TaskExecutionState(executionGraph.getJobID(),
+ executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+ ExecutionState.RUNNING));
+ }
+
+ private void failVertex(final ExecutionVertex onlyExecutionVertex) {
+ onlyExecutionVertex.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+ manualMainThreadExecutor.triggerAll();
+ }
+
+ private static JobGraph createStreamingJobGraph() {
+ final JobVertex v1 = new JobVertex("vertex1");
+ v1.setInvokableClass(AbstractInvokable.class);
+
+ final JobGraph jobGraph = new JobGraph(v1);
+ jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+ return jobGraph;
+ }
+
+ private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+ final ExecutionGraph executionGraph = new ExecutionGraph(
+ new DummyJobInformation(
+ jobGraph.getJobID(),
+ jobGraph.getName()),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ AkkaUtils.getDefaultTimeout(),
+ new InfiniteDelayRestartStrategy(10),
+ AdaptedRestartPipelinedRegionStrategyNG::new,
+ new SimpleSlotProvider(jobGraph.getJobID(), 1));
+
+ executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+ enableCheckpointing(executionGraph);
+ executionGraph.setScheduleMode(jobGraph.getScheduleMode());
+ executionGraph.start(componentMainThreadExecutor);
+ executionGraph.scheduleForExecution();
+ manualMainThreadExecutor.triggerAll();
+ return executionGraph;
+ }
+
+ private static void enableCheckpointing(final ExecutionGraph executionGraph) {
+ final List<ExecutionJobVertex> jobVertices = new ArrayList<>(executionGraph.getAllVertices().values());
+ final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
+ Long.MAX_VALUE,
+ Long.MAX_VALUE,
+ 0,
+ 1,
+ CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION,
+ true,
+ false,
+ 0);
+
+ executionGraph.enableCheckpointing(
+ checkpointCoordinatorConfiguration,
+ jobVertices,
+ jobVertices,
+ jobVertices,
+ Collections.emptyList(),
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ new CheckpointStatsTracker(
+ 0,
+ jobVertices,
+ checkpointCoordinatorConfiguration,
+ new UnregisteredMetricsGroup()));
+ }
+
+ private static void assertNoPendingCheckpoints(final CheckpointCoordinator checkpointCoordinator) {
+ assertThat(checkpointCoordinator.getPendingCheckpoints().entrySet(), is(empty()));
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
new file mode 100644
index 0000000..20fb083
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger {
+
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ private static final int DEFAULT_PARALLELISM = 2;
+
+ private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+ private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+ private TestRestartStrategy manuallyTriggeredRestartStrategy;
+
+ @Before
+ public void setUp() {
+ manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+ componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread());
+ manuallyTriggeredRestartStrategy = TestRestartStrategy.manuallyTriggered();
+ }
+
+ /**
+ * Tests that 2 concurrent region failovers can lead to a properly vertex state.
+ * <pre>
+ * (v11) -+-> (v21)
+ * x
+ * (v12) -+-> (v22)
+ *
+ * ^
+ * |
+ * (blocking)
+ * </pre>
+ */
+ @Test
+ public void testConcurrentRegionFailovers() throws Exception {
+
+ // the logic in this test is as follows:
+ // - start a job
+ // - cause {ev11} failure and delay the local recovery action via the manual executor
+ // - cause {ev12} failure and delay the local recovery action via the manual executor
+ // - resume local recovery actions
+ // - validate that each task is restarted only once
+
+ final ExecutionGraph eg = createExecutionGraph();
+
+ final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy =
+ (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy();
+ failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+
+ final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+ final ExecutionVertex ev11 = vertexIterator.next();
+ final ExecutionVertex ev12 = vertexIterator.next();
+ final ExecutionVertex ev21 = vertexIterator.next();
+ final ExecutionVertex ev22 = vertexIterator.next();
+
+ // start job scheduling
+ eg.scheduleForExecution();
+ manualMainThreadExecutor.triggerAll();
+
+ // fail ev11 to trigger region failover of {ev11}, {ev21}, {ev22}
+ ev11.getCurrentExecutionAttempt().fail(new Exception("task failure 1"));
+ manualMainThreadExecutor.triggerAll();
+ assertEquals(ExecutionState.FAILED, ev11.getExecutionState());
+ assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState());
+ assertEquals(ExecutionState.CANCELED, ev21.getExecutionState());
+ assertEquals(ExecutionState.CANCELED, ev22.getExecutionState());
+
+ // fail ev12 to trigger region failover of {ev12}, {ev21}, {ev22}
+ ev12.getCurrentExecutionAttempt().fail(new Exception("task failure 2"));
+ manualMainThreadExecutor.triggerAll();
+ assertEquals(ExecutionState.FAILED, ev11.getExecutionState());
+ assertEquals(ExecutionState.FAILED, ev12.getExecutionState());
+ assertEquals(ExecutionState.CANCELED, ev21.getExecutionState());
+ assertEquals(ExecutionState.CANCELED, ev22.getExecutionState());
+
+ // complete region failover blocker to trigger region failover recovery
+ failoverStrategy.getBlockerFuture().complete(null);
+ manualMainThreadExecutor.triggerAll();
+
+ // verify that all tasks are recovered and no task is restarted more than once
+ assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState());
+ assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState());
+ assertEquals(ExecutionState.CREATED, ev21.getExecutionState());
+ assertEquals(ExecutionState.CREATED, ev22.getExecutionState());
+ assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber());
+ }
+
+ /**
+ * Tests that a global failover will take precedence over local failovers.
+ * <pre>
+ * (v11) -+-> (v21)
+ * x
+ * (v12) -+-> (v22)
+ *
+ * ^
+ * |
+ * (blocking)
+ * </pre>
+ */
+ @Test
+ public void testRegionFailoverInterruptedByGlobalFailover() throws Exception {
+
+ // the logic in this test is as follows:
+ // - start a job
+ // - cause a task failure and delay the local recovery action via the manual executor
+ // - cause a global failure
+ // - resume in local recovery action
+ // - validate that the local recovery does not restart tasks
+
+ final ExecutionGraph eg = createExecutionGraph();
+
+ final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy =
+ (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy();
+ failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+
+ final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+ final ExecutionVertex ev11 = vertexIterator.next();
+ final ExecutionVertex ev12 = vertexIterator.next();
+ final ExecutionVertex ev21 = vertexIterator.next();
+ final ExecutionVertex ev22 = vertexIterator.next();
+
+ // start job scheduling
+ eg.scheduleForExecution();
+ manualMainThreadExecutor.triggerAll();
+
+ // fail ev11 to trigger region failover of {ev11}, {ev21}, {ev22}
+ ev11.getCurrentExecutionAttempt().fail(new Exception("task failure"));
+ manualMainThreadExecutor.triggerAll();
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ assertEquals(ExecutionState.FAILED, ev11.getExecutionState());
+ assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState());
+ assertEquals(ExecutionState.CANCELED, ev21.getExecutionState());
+ assertEquals(ExecutionState.CANCELED, ev22.getExecutionState());
+
+ // trigger global failover cancelling and immediately recovery
+ eg.failGlobal(new Exception("Test global failure"));
+ ev12.getCurrentExecutionAttempt().completeCancelling();
+ manuallyTriggeredRestartStrategy.triggerNextAction();
+ manualMainThreadExecutor.triggerAll();
+
+ // verify the job state and vertex attempt number
+ assertEquals(2, eg.getGlobalModVersion());
+ assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber());
+
+ // complete region failover blocker to trigger region failover
+ failoverStrategy.getBlockerFuture().complete(null);
+ manualMainThreadExecutor.triggerAll();
+
+ // verify that no task is restarted by region failover
+ assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState());
+ assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState());
+ assertEquals(ExecutionState.CREATED, ev21.getExecutionState());
+ assertEquals(ExecutionState.CREATED, ev22.getExecutionState());
+ assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber());
+ }
+
+ @Test
+ public void testSkipFailoverIfExecutionStateIsNotRunning() throws Exception {
+ final ExecutionGraph executionGraph = createExecutionGraph();
+
+ final Iterator<ExecutionVertex> vertexIterator = executionGraph.getAllExecutionVertices().iterator();
+ final ExecutionVertex firstVertex = vertexIterator.next();
+
+ executionGraph.cancel();
+
+ final FailoverStrategy failoverStrategy = executionGraph.getFailoverStrategy();
+ failoverStrategy.onTaskFailure(firstVertex.getCurrentExecutionAttempt(), new Exception("Test Exception"));
+ manualMainThreadExecutor.triggerAll();
+
+ assertEquals(ExecutionState.CANCELED, firstVertex.getExecutionState());
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creating a sample ExecutionGraph for testing with topology as below.
+ * <pre>
+ * (v11) -+-> (v21)
+ * x
+ * (v12) -+-> (v22)
+ *
+ * ^
+ * |
+ * (blocking)
+ * </pre>
+ * 4 regions. Each consists of one individual execution vertex.
+ */
+ private ExecutionGraph createExecutionGraph() throws Exception {
+
+ final JobInformation jobInformation = new DummyJobInformation(TEST_JOB_ID, "test job");
+ final SimpleSlotProvider slotProvider = new SimpleSlotProvider(TEST_JOB_ID, DEFAULT_PARALLELISM);
+
+ final Time timeout = Time.seconds(10L);
+ final ExecutionGraph graph = new ExecutionGraph(
+ jobInformation,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ timeout,
+ manuallyTriggeredRestartStrategy,
+ TestAdaptedRestartPipelinedRegionStrategyNG::new,
+ slotProvider,
+ getClass().getClassLoader(),
+ VoidBlobWriter.getInstance(),
+ timeout);
+
+ JobVertex v1 = new JobVertex("vertex1");
+ v1.setInvokableClass(NoOpInvokable.class);
+ v1.setParallelism(DEFAULT_PARALLELISM);
+
+ JobVertex v2 = new JobVertex("vertex2");
+ v2.setInvokableClass(NoOpInvokable.class);
+ v2.setParallelism(DEFAULT_PARALLELISM);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+ JobGraph jg = new JobGraph(TEST_JOB_ID, "testjob", v1, v2);
+ graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+
+ graph.start(componentMainThreadExecutor);
+
+ return graph;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
new file mode 100644
index 0000000..c4c1c32
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger {
+
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+ private FailingSlotProviderDecorator slotProvider;
+
+ private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+ @Before
+ public void setUp() {
+ manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+ componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread());
+ slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14));
+ }
+
+ /**
+ * Tests for region failover for job in EAGER mode.
+ * This applies to streaming job, with no BLOCKING edge.
+ * <pre>
+ * (v11) ---> (v21)
+ *
+ * (v12) ---> (v22)
+ *
+ * ^
+ * |
+ * (pipelined)
+ * </pre>
+ */
+ @Test
+ public void testRegionFailoverInEagerMode() throws Exception {
+ // create a streaming job graph with EAGER schedule mode
+ final JobGraph jobGraph = createStreamingJobGraph();
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+ final ExecutionVertex ev11 = vertexIterator.next();
+ final ExecutionVertex ev12 = vertexIterator.next();
+ final ExecutionVertex ev21 = vertexIterator.next();
+ final ExecutionVertex ev22 = vertexIterator.next();
+
+ // trigger task failure of ev11
+ // vertices { ev11, ev21 } should be affected
+ ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+ manualMainThreadExecutor.triggerAll();
+
+ // verify vertex states and complete cancellation
+ assertVertexInState(ExecutionState.FAILED, ev11);
+ assertVertexInState(ExecutionState.DEPLOYING, ev12);
+ assertVertexInState(ExecutionState.CANCELING, ev21);
+ assertVertexInState(ExecutionState.DEPLOYING, ev22);
+ ev21.getCurrentExecutionAttempt().completeCancelling();
+ manualMainThreadExecutor.triggerAll();
+
+ // verify vertex states
+ // in eager mode, all affected vertices should be scheduled in failover
+ assertVertexInState(ExecutionState.DEPLOYING, ev11);
+ assertVertexInState(ExecutionState.DEPLOYING, ev12);
+ assertVertexInState(ExecutionState.DEPLOYING, ev21);
+ assertVertexInState(ExecutionState.DEPLOYING, ev22);
+
+ // verify attempt number
+ assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(0, ev12.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(0, ev22.getCurrentExecutionAttempt().getAttemptNumber());
+ }
+
+ /**
+ * Tests for scenario where a task fails for its own error, in which case the
+ * region containing the failed task and its consumer regions should be restarted.
+ * <pre>
+ * (v11) -+-> (v21)
+ * x
+ * (v12) -+-> (v22)
+ *
+ * ^
+ * |
+ * (blocking)
+ * </pre>
+ */
+ @Test
+ public void testRegionFailoverForRegionInternalErrorsInLazyMode() throws Exception {
+ final JobGraph jobGraph = createBatchJobGraph();
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+ final ExecutionVertex ev11 = vertexIterator.next();
+ final ExecutionVertex ev12 = vertexIterator.next();
+ final ExecutionVertex ev21 = vertexIterator.next();
+ final ExecutionVertex ev22 = vertexIterator.next();
+
+ // trigger task failure of ev11
+ // regions {ev11}, {ev21}, {ev22} should be affected
+ ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+ manualMainThreadExecutor.triggerAll();
+
+ // verify vertex states
+ // only vertices with consumable inputs can be scheduled
+ assertVertexInState(ExecutionState.DEPLOYING, ev11);
+ assertVertexInState(ExecutionState.DEPLOYING, ev12);
+ assertVertexInState(ExecutionState.CREATED, ev21);
+ assertVertexInState(ExecutionState.CREATED, ev22);
+
+ // verify attempt number
+ assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(0, ev12.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber());
+ }
+
+ /**
+ * Tests that the failure is properly propagated to underlying strategy
+ * to calculate tasks to restart.
+ * <pre>
+ * (v11) -+-> (v21)
+ * x
+ * (v12) -+-> (v22)
+ *
+ * ^
+ * |
+ * (blocking)
+ * </pre>
+ */
+ @Test
+ public void testFailurePropagationToUnderlyingStrategy() throws Exception {
+ final JobGraph jobGraph = createBatchJobGraph();
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy =
+ (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy();
+
+ final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+ final ExecutionVertex ev11 = vertexIterator.next();
+ final ExecutionVertex ev12 = vertexIterator.next();
+ final ExecutionVertex ev21 = vertexIterator.next();
+ final ExecutionVertex ev22 = vertexIterator.next();
+
+ // finish upstream regions to trigger scheduling of downstream regions
+ ev11.getCurrentExecutionAttempt().markFinished();
+ ev12.getCurrentExecutionAttempt().markFinished();
+
+ // trigger task failure of ev21 on consuming data from ev11
+ Exception taskFailureCause = new PartitionConnectionException(
+ new ResultPartitionID(
+ ev11.getProducedPartitions().keySet().iterator().next(),
+ ev11.getCurrentExecutionAttempt().getAttemptId()),
+ new Exception("Test failure"));
+ ev21.getCurrentExecutionAttempt().fail(taskFailureCause);
+ manualMainThreadExecutor.triggerAll();
+
+ assertThat(failoverStrategy.getLastTasksToCancel(),
+ containsInAnyOrder(ev11.getID(), ev21.getID(), ev22.getID()));
+ }
+
+ /**
+ * Tests that when a task fail, and restart strategy doesn't support restarting, the job will go to failed.
+ */
+ @Test
+ public void testNoRestart() throws Exception {
+ final JobGraph jobGraph = createBatchJobGraph();
+ final NoRestartStrategy restartStrategy = new NoRestartStrategy();
+ final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy);
+
+ final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next();
+
+ ev.fail(new Exception("Test Exception"));
+
+ for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+ evs.getCurrentExecutionAttempt().completeCancelling();
+ }
+
+ manualMainThreadExecutor.triggerAll();
+
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+
+ @Test
+ public void testFailGlobalIfErrorOnRestartingTasks() throws Exception {
+ final JobGraph jobGraph = createStreamingJobGraph();
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+ final ExecutionVertex ev11 = vertexIterator.next();
+ final ExecutionVertex ev12 = vertexIterator.next();
+ final ExecutionVertex ev21 = vertexIterator.next();
+ final ExecutionVertex ev22 = vertexIterator.next();
+
+ final long globalModVersionBeforeFailure = eg.getGlobalModVersion();
+
+ slotProvider.setFailSlotAllocation(true);
+ ev11.fail(new Exception("Test Exception"));
+ completeCancelling(ev11, ev12, ev21, ev22);
+
+ manualMainThreadExecutor.triggerAll();
+
+ final long globalModVersionAfterFailure = eg.getGlobalModVersion();
+
+ assertNotEquals(globalModVersionBeforeFailure, globalModVersionAfterFailure);
+ }
+
+ // ------------------------------- Test Utils -----------------------------------------
+
+ /**
+ * Creating job graph as below (execution view).
+ * It's a representative of streaming job.
+ * <pre>
+ * (v11) -+-> (v21)
+ *
+ * (v12) -+-> (v22)
+ *
+ * ^
+ * |
+ * (pipelined)
+ * </pre>
+ * 2 regions. Each has 2 pipelined connected vertices.
+ */
+ private JobGraph createStreamingJobGraph() {
+ final JobVertex v1 = new JobVertex("vertex1");
+ final JobVertex v2 = new JobVertex("vertex2");
+
+ v1.setParallelism(2);
+ v2.setParallelism(2);
+
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob", v1, v2);
+ jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+ return jobGraph;
+ }
+
+ /**
+ * Creating job graph as below (execution view).
+ * It's a representative of batch job.
+ * <pre>
+ * (v11) -+-> (v21)
+ * x
+ * (v12) -+-> (v22)
+ *
+ * ^
+ * |
+ * (blocking)
+ * </pre>
+ * 4 regions. Each consists of one individual vertex.
+ */
+ private JobGraph createBatchJobGraph() {
+ final JobVertex v1 = new JobVertex("vertex1");
+ final JobVertex v2 = new JobVertex("vertex2");
+
+ v1.setParallelism(2);
+ v2.setParallelism(2);
+
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+ final JobGraph jobGraph = new JobGraph(v1, v2);
+ jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+
+ return jobGraph;
+ }
+
+ private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+ return createExecutionGraph(jobGraph, new InfiniteDelayRestartStrategy(10));
+ }
+
+ private ExecutionGraph createExecutionGraph(
+ final JobGraph jobGraph,
+ final RestartStrategy restartStrategy) throws Exception {
+
+ final ExecutionGraph eg = new ExecutionGraph(
+ new DummyJobInformation(
+ jobGraph.getJobID(),
+ jobGraph.getName()),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ AkkaUtils.getDefaultTimeout(),
+ restartStrategy,
+ TestAdaptedRestartPipelinedRegionStrategyNG::new,
+ slotProvider);
+ eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ eg.setScheduleMode(jobGraph.getScheduleMode());
+
+ eg.start(componentMainThreadExecutor);
+ eg.scheduleForExecution();
+ manualMainThreadExecutor.triggerAll();
+
+ return eg;
+ }
+
+ private static void assertVertexInState(final ExecutionState state, final ExecutionVertex vertex) {
+ assertEquals(state, vertex.getExecutionState());
+ }
+
+ private static void completeCancelling(ExecutionVertex... executionVertices) {
+ for (final ExecutionVertex executionVertex : executionVertices) {
+ executionVertex.getCurrentExecutionAttempt().completeCancelling();
+ }
+ }
+
+ /**
+ * Test implementation of the {@link AdaptedRestartPipelinedRegionStrategyNG} that makes it possible
+ * to control when the failover action is performed via {@link CompletableFuture}.
+ * It also exposes some internal state of {@link AdaptedRestartPipelinedRegionStrategyNG}.
+ */
+ static class TestAdaptedRestartPipelinedRegionStrategyNG extends AdaptedRestartPipelinedRegionStrategyNG {
+
+ private CompletableFuture<?> blockerFuture;
+
+ private Set<ExecutionVertexID> lastTasksToRestart;
+
+ TestAdaptedRestartPipelinedRegionStrategyNG(ExecutionGraph executionGraph) {
+ super(executionGraph);
+ this.blockerFuture = CompletableFuture.completedFuture(null);
+ }
+
+ void setBlockerFuture(CompletableFuture<?> blockerFuture) {
+ this.blockerFuture = blockerFuture;
+ }
+
+ @Override
+ protected void restartTasks(final Set<ExecutionVertexID> verticesToRestart) {
+ this.lastTasksToRestart = verticesToRestart;
+ super.restartTasks(verticesToRestart);
+ }
+
+ @Override
+ protected CompletableFuture<?> cancelTasks(final Set<ExecutionVertexID> vertices) {
+ final List<CompletableFuture<?>> terminationAndBlocker = Arrays.asList(
+ super.cancelTasks(vertices),
+ blockerFuture);
+ return FutureUtils.waitForAll(terminationAndBlocker);
+ }
+
+ CompletableFuture<?> getBlockerFuture() {
+ return blockerFuture;
+ }
+
+ Set<ExecutionVertexID> getLastTasksToCancel() {
+ return lastTasksToRestart;
+ }
+ }
+
+ private static class FailingSlotProviderDecorator implements SlotProvider {
+
+ private final SlotProvider delegate;
+
+ private boolean failSlotAllocation = false;
+
+ FailingSlotProviderDecorator(final SlotProvider delegate) {
+ this.delegate = checkNotNull(delegate);
+ }
+
+ @Override
+ public CompletableFuture<LogicalSlot> allocateSlot(
+ final SlotRequestId slotRequestId,
+ final ScheduledUnit scheduledUnit,
+ final SlotProfile slotProfile,
+ final boolean allowQueuedScheduling,
+ final Time allocationTimeout) {
+ if (failSlotAllocation) {
+ return FutureUtils.completedExceptionally(new TimeoutException("Expected"));
+ }
+ return delegate.allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
+ }
+
+ @Override
+ public void cancelSlotRequest(
+ final SlotRequestId slotRequestId,
+ @Nullable final SlotSharingGroupId slotSharingGroupId,
+ final Throwable cause) {
+ delegate.cancelSlotRequest(slotRequestId, slotSharingGroupId, cause);
+ }
+
+ public void setFailSlotAllocation(final boolean failSlotAllocation) {
+ this.failSlotAllocation = failSlotAllocation;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1e6be72..e78fa6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -68,6 +70,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -264,6 +267,30 @@ public class ExecutionGraphRestartTest extends TestLogger {
}
+ @Test
+ public void testTaskFailingWhileGlobalFailing() throws Exception {
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ final ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(new InfiniteDelayRestartStrategy())
+ .setFailoverStrategyFactory(new TestFailoverStrategy.Factory())
+ .buildAndScheduleForExecution(slotPool);
+ final TestFailoverStrategy failoverStrategy = (TestFailoverStrategy) graph.getFailoverStrategy();
+
+ // switch all tasks to running
+ for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
+
+ graph.failGlobal(new Exception("test"));
+
+ graph.getAllExecutionVertices().iterator().next().fail(new Exception("Test task failure"));
+
+ // no local failover should happen when in global failover cancelling
+ assertEquals(0, failoverStrategy.getLocalFailoverCount());
+ }
+
+ }
+
private void switchAllTasksToRunning(ExecutionGraph graph) {
executeOperationForAllExecutions(graph, Execution::switchToRunning);
}
@@ -659,6 +686,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
private static class TestingExecutionGraphBuilder {
private RestartStrategy restartStrategy = new NoRestartStrategy();
+ private FailoverStrategy.Factory failoverStrategyFactory = new RestartAllStrategy.Factory();
private JobGraph jobGraph = createJobGraph();
private int tasksNum = NUM_TASKS;
private TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
@@ -668,6 +696,11 @@ public class ExecutionGraphRestartTest extends TestLogger {
return this;
}
+ private TestingExecutionGraphBuilder setFailoverStrategyFactory(FailoverStrategy.Factory failoverStrategyFactory) {
+ this.failoverStrategyFactory = failoverStrategyFactory;
+ return this;
+ }
+
private TestingExecutionGraphBuilder setJobGraph(JobGraph jobGraph) {
this.jobGraph = jobGraph;
return this;
@@ -689,7 +722,11 @@ public class ExecutionGraphRestartTest extends TestLogger {
private ExecutionGraph buildAndScheduleForExecution(SlotPool slotPool) throws Exception {
final Scheduler scheduler = createSchedulerWithSlots(tasksNum, slotPool, taskManagerLocation);
- final ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler, jobGraph);
+ final ExecutionGraph eg = createSimpleExecutionGraph(
+ restartStrategy,
+ failoverStrategyFactory,
+ scheduler,
+ jobGraph);
assertEquals(JobStatus.CREATED, eg.getState());
@@ -744,18 +781,32 @@ public class ExecutionGraphRestartTest extends TestLogger {
}
private static ExecutionGraph createSimpleExecutionGraph(
- RestartStrategy restartStrategy, SlotProvider slotProvider, JobGraph jobGraph)
- throws IOException, JobException {
+ final RestartStrategy restartStrategy,
+ final SlotProvider slotProvider,
+ final JobGraph jobGraph) throws IOException, JobException {
- ExecutionGraph executionGraph = new ExecutionGraph(
+ return createSimpleExecutionGraph(restartStrategy, new RestartAllStrategy.Factory(), slotProvider, jobGraph);
+ }
+
+ private static ExecutionGraph createSimpleExecutionGraph(
+ final RestartStrategy restartStrategy,
+ final FailoverStrategy.Factory failoverStrategyFactory,
+ final SlotProvider slotProvider,
+ final JobGraph jobGraph) throws IOException, JobException {
+
+ final ExecutionGraph executionGraph = new ExecutionGraph(
+ new JobInformation(
+ TEST_JOB_ID,
+ "Test job",
+ new SerializedValue<>(new ExecutionConfig()),
+ new Configuration(),
+ Collections.emptyList(),
+ Collections.emptyList()),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- TEST_JOB_ID,
- "Test job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
restartStrategy,
+ failoverStrategyFactory,
slotProvider);
executionGraph.start(mainThreadExecutor);
@@ -779,4 +830,45 @@ public class ExecutionGraphRestartTest extends TestLogger {
finishAllVertices(eg);
assertEquals(JobStatus.FINISHED, eg.getState());
}
+
+ /**
+ * Test failover strategy which records local failover count.
+ */
+ static class TestFailoverStrategy extends FailoverStrategy {
+
+ private int localFailoverCount = 0;
+
+ @Override
+ public void onTaskFailure(Execution taskExecution, Throwable cause) {
+ localFailoverCount++;
+ }
+
+ @Override
+ public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
+ }
+
+ @Override
+ public String getStrategyName() {
+ return "Test Failover Strategy";
+ }
+
+ int getLocalFailoverCount() {
+ return localFailoverCount;
+ }
+
+ // ------------------------------------------------------------------------
+ // factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Factory that instantiates the TestFailoverStrategy.
+ */
+ public static class Factory implements FailoverStrategy.Factory {
+
+ @Override
+ public FailoverStrategy create(ExecutionGraph executionGraph) {
+ return new TestFailoverStrategy();
+ }
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
index b9c1322..f17dddc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
@@ -627,7 +627,7 @@ public class PipelinedFailoverRegionBuildingTest extends TestLogger {
final Configuration jobManagerConfig = new Configuration();
jobManagerConfig.setString(
JobManagerOptions.EXECUTION_FAILOVER_STRATEGY,
- FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME);
+ FailoverStrategyLoader.LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME);
final Time timeout = Time.seconds(10L);
return ExecutionGraphBuilder.buildGraph(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java
index 81f5e38..1063222 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java
@@ -524,7 +524,7 @@ public class RestartPipelinedRegionStrategyBuildingTest extends TestLogger {
// utilities
// ------------------------------------------------------------------------
- private static void assertSameRegion(FailoverRegion ...regions) {
+ public static void assertSameRegion(FailoverRegion ...regions) {
checkNotNull(regions);
for (int i = 0; i < regions.length; i++) {
for (int j = i + 1; i < regions.length; i++) {
@@ -533,7 +533,7 @@ public class RestartPipelinedRegionStrategyBuildingTest extends TestLogger {
}
}
- private static void assertDistinctRegions(FailoverRegion ...regions) {
+ public static void assertDistinctRegions(FailoverRegion ...regions) {
checkNotNull(regions);
for (int i = 0; i < regions.length; i++) {
for (int j = i + 1; j < regions.length; j++) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java
new file mode 100644
index 0000000..9ba26ea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ExecutionVertexVersioner}.
+ */
+public class ExecutionVertexVersionerTest extends TestLogger {
+
+ private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID1 = new ExecutionVertexID(new JobVertexID(), 0);
+ private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID2 = new ExecutionVertexID(new JobVertexID(), 0);
+ private static final Collection<ExecutionVertexID> TEST_ALL_EXECUTION_VERTEX_IDS = Arrays.asList(
+ TEST_EXECUTION_VERTEX_ID1,
+ TEST_EXECUTION_VERTEX_ID2);
+
+ private ExecutionVertexVersioner executionVertexVersioner;
+
+ @Before
+ public void setUp() {
+ executionVertexVersioner = new ExecutionVertexVersioner();
+ }
+
+ @Test
+ public void isModifiedReturnsFalseIfVertexUnmodified() {
+ final ExecutionVertexVersion executionVertexVersion =
+ executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1);
+ assertFalse(executionVertexVersioner.isModified(executionVertexVersion));
+ }
+
+ @Test
+ public void isModifiedReturnsTrueIfVertexIsModified() {
+ final ExecutionVertexVersion executionVertexVersion =
+ executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1);
+ executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1);
+ assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
+ }
+
+ @Test
+ public void throwsExceptionIfVertexWasNeverModified() {
+ try {
+ executionVertexVersioner.isModified(new ExecutionVertexVersion(TEST_EXECUTION_VERTEX_ID1, 0));
+ fail("Expected exception not thrown");
+ } catch (final IllegalStateException e) {
+ assertThat(e.getMessage(), containsString("Execution vertex "
+ + TEST_EXECUTION_VERTEX_ID1 + " does not have a recorded version"));
+ }
+ }
+
+ @Test
+ public void getUnmodifiedVerticesAllVerticesModified() {
+ final Set<ExecutionVertexVersion> executionVertexVersions = new HashSet<>(
+ executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS).values());
+ executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS);
+
+ final Set<ExecutionVertexID> unmodifiedExecutionVertices =
+ executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+
+ assertThat(unmodifiedExecutionVertices, is(empty()));
+ }
+
+ @Test
+ public void getUnmodifiedVerticesNoVertexModified() {
+ final Set<ExecutionVertexVersion> executionVertexVersions = new HashSet<>(
+ executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS).values());
+
+ final Set<ExecutionVertexID> unmodifiedExecutionVertices =
+ executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+
+ assertThat(unmodifiedExecutionVertices, containsInAnyOrder(TEST_EXECUTION_VERTEX_ID1, TEST_EXECUTION_VERTEX_ID2));
+ }
+
+ @Test
+ public void getUnmodifiedVerticesPartOfVerticesModified() {
+ final Set<ExecutionVertexVersion> executionVertexVersions = new HashSet<>(
+ executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS).values());
+ executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1);
+
+ final Set<ExecutionVertexID> unmodifiedExecutionVertices =
+ executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+
+ assertThat(unmodifiedExecutionVertices, containsInAnyOrder(TEST_EXECUTION_VERTEX_ID2));
+ }
+}