You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/28 07:38:05 UTC
[flink] branch master updated: [FLINK-12068][runtime] Implement
region backtracking for region failover strategy
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 58987dd [FLINK-12068][runtime] Implement region backtracking for region failover strategy
58987dd is described below
commit 58987dd16c7e8af36e935e811f716d2f843de5ca
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Wed May 15 18:38:37 2019 +0800
[FLINK-12068][runtime] Implement region backtracking for region failover strategy
---
.../failover/flip1/FailoverRegion.java | 20 +-
.../flip1/RestartPipelinedRegionStrategy.java | 164 +++++++--
.../flip1/ResultPartitionAvailabilityChecker.java | 35 ++
.../flip1/RestartPipelinedRegionStrategyTest.java | 388 ++++++++++++++++++++-
.../failover/flip1/TestFailoverTopology.java | 12 +-
5 files changed, 569 insertions(+), 50 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
index 88a7658..9a013df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
@@ -20,10 +20,7 @@ package org.apache.flink.runtime.executiongraph.failover.flip1;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -33,18 +30,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class FailoverRegion {
+ /** All vertex IDs in this region. */
+ private final Set<ExecutionVertexID> executionVertexIDs;
+
/** All vertices in this region. */
- private final Map<ExecutionVertexID, FailoverVertex> executionVertices;
+ private final Set<FailoverVertex> executionVertices;
/**
* Creates a new failover region containing a set of vertices.
*
* @param executionVertices to be contained in this region
*/
- public FailoverRegion(Collection<? extends FailoverVertex> executionVertices) {
- checkNotNull(executionVertices);
- this.executionVertices = new HashMap<>();
- executionVertices.forEach(v -> this.executionVertices.put(v.getExecutionVertexID(), v));
+ public FailoverRegion(Set<FailoverVertex> executionVertices) {
+ this.executionVertices = checkNotNull(executionVertices);
+ this.executionVertexIDs = new HashSet<>();
+ executionVertices.forEach(v -> this.executionVertexIDs.add(v.getExecutionVertexID()));
}
/**
@@ -53,7 +53,7 @@ public class FailoverRegion {
* @return IDs of all vertices in this region
*/
public Set<ExecutionVertexID> getAllExecutionVertexIDs() {
- return executionVertices.keySet();
+ return executionVertexIDs;
}
/**
@@ -62,6 +62,6 @@ public class FailoverRegion {
* @return all vertices in this region
*/
public Set<FailoverVertex> getAllExecutionVertices() {
- return new HashSet<>(executionVertices.values());
+ return executionVertices;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
index e7040c3..8e06327 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
@@ -18,20 +18,22 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
-import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -47,23 +49,46 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
/** The topology containing info about all the vertices and edges. */
private final FailoverTopology topology;
+ /** All failover regions. */
+ private final IdentityHashMap<FailoverRegion, Object> regions;
+
/** Maps execution vertex id to failover region. */
- private final Map<ExecutionVertexID, FailoverRegion> regions;
+ private final Map<ExecutionVertexID, FailoverRegion> vertexToRegionMap;
+
+ /** The checker helps to query result partition availability. */
+ private final RegionFailoverResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;
/**
* Creates a new failover strategy to restart pipelined regions that works on the given topology.
+ * The result partitions are always considered to be available if no data consumption error happens.
*
* @param topology containing info about all the vertices and edges
*/
+ @VisibleForTesting
public RestartPipelinedRegionStrategy(FailoverTopology topology) {
+ this(topology, resultPartitionID -> true);
+ }
+
+ /**
+ * Creates a new failover strategy to restart pipelined regions that works on the given topology.
+ *
+ * @param topology containing info about all the vertices and edges
+ * @param resultPartitionAvailabilityChecker helps to query result partition availability
+ */
+ public RestartPipelinedRegionStrategy(
+ FailoverTopology topology,
+ ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {
+
this.topology = checkNotNull(topology);
- this.regions = new HashMap<>();
+ this.regions = new IdentityHashMap<>();
+ this.vertexToRegionMap = new HashMap<>();
+ this.resultPartitionAvailabilityChecker = new RegionFailoverResultPartitionAvailabilityChecker(
+ resultPartitionAvailabilityChecker);
// build regions based on the given topology
LOG.info("Start building failover regions.");
buildFailoverRegions();
}
-
// ------------------------------------------------------------------------
// region building
// ------------------------------------------------------------------------
@@ -131,25 +156,28 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
for (HashSet<FailoverVertex> regionVertices : distinctRegions.keySet()) {
LOG.debug("Creating a failover region with {} vertices.", regionVertices.size());
final FailoverRegion failoverRegion = new FailoverRegion(regionVertices);
+ regions.put(failoverRegion, null);
for (FailoverVertex vertex : regionVertices) {
- this.regions.put(vertex.getExecutionVertexID(), failoverRegion);
+ vertexToRegionMap.put(vertex.getExecutionVertexID(), failoverRegion);
}
}
- LOG.info("Created {} failover regions.", distinctRegions.size());
+
+ LOG.info("Created {} failover regions.", regions.size());
}
private void buildOneRegionForAllVertices() {
LOG.warn("Cannot decompose the topology into individual failover regions due to use of " +
"Co-Location constraints (iterations). Job will fail over as one holistic unit.");
- final List<FailoverVertex> allVertices = new ArrayList<>();
+ final Set<FailoverVertex> allVertices = new HashSet<>();
for (FailoverVertex vertex : topology.getFailoverVertices()) {
allVertices.add(vertex);
}
final FailoverRegion region = new FailoverRegion(allVertices);
+ regions.put(region, null);
for (FailoverVertex vertex : topology.getFailoverVertices()) {
- regions.put(vertex.getExecutionVertexID(), region);
+ vertexToRegionMap.put(vertex.getExecutionVertexID(), region);
}
}
@@ -162,9 +190,9 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
* In this strategy, all task vertices in 'involved' regions are proposed to be restarted.
* The 'involved' regions are calculated with rules below:
* 1. The region containing the failed task is always involved
- * 2. TODO: If an input result partition of an involved region is not available, i.e. Missing or Corrupted,
+ * 2. If an input result partition of an involved region is not available, i.e. Missing or Corrupted,
* the region containing the partition producer task is involved
- * 3. TODO: If a region is involved, all of its consumer regions are involved
+ * 3. If a region is involved, all of its consumer regions are involved
*
* @param executionVertexId ID of the failed task
* @param cause cause of the failure
@@ -172,30 +200,87 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
*/
@Override
public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) {
- final FailoverRegion failedRegion = regions.get(executionVertexId);
+ LOG.info("Calculating tasks to restart to recover the failed task {}.", executionVertexId);
+
+ final FailoverRegion failedRegion = vertexToRegionMap.get(executionVertexId);
if (failedRegion == null) {
// TODO: show the task name in the log
throw new IllegalStateException("Can not find the failover region for task " + executionVertexId, cause);
}
- // TODO: if the failure cause is data consumption error, mark the corresponding data partition to be unavailable
+ // if the failure cause is data consumption error, mark the corresponding data partition to be failed,
+ // so that the failover process will try to recover it
+ Optional<PartitionException> dataConsumptionException = ExceptionUtils.findThrowable(
+ cause, PartitionException.class);
+ if (dataConsumptionException.isPresent()) {
+ resultPartitionAvailabilityChecker.markResultPartitionFailed(
+ dataConsumptionException.get().getPartitionId().getPartitionId());
+ }
+
+ // calculate the tasks to restart based on the result of regions to restart
+ Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
+ for (FailoverRegion region : getRegionsToRestart(failedRegion)) {
+ tasksToRestart.addAll(region.getAllExecutionVertexIDs());
+ }
- return getRegionsToRestart(failedRegion).stream().flatMap(
- r -> r.getAllExecutionVertexIDs().stream()).collect(Collectors.toSet());
+ // the previous failed partition will be recovered. remove its failed state from the checker
+ if (dataConsumptionException.isPresent()) {
+ resultPartitionAvailabilityChecker.removeResultPartitionFromFailedState(
+ dataConsumptionException.get().getPartitionId().getPartitionId());
+ }
+
+ LOG.info("{} tasks should be restarted to recover the failed task {}. ", tasksToRestart.size(), executionVertexId);
+ return tasksToRestart;
}
/**
* All 'involved' regions are proposed to be restarted.
* The 'involved' regions are calculated with rules below:
* 1. The region containing the failed task is always involved
- * 2. TODO: If an input result partition of an involved region is not available, i.e. Missing or Corrupted,
+ * 2. If an input result partition of an involved region is not available, i.e. Missing or Corrupted,
* the region containing the partition producer task is involved
- * 3. TODO: If a region is involved, all of its consumer regions are involved
+ * 3. If a region is involved, all of its consumer regions are involved
*/
- private Set<FailoverRegion> getRegionsToRestart(FailoverRegion regionToRestart) {
- return Collections.singleton(regionToRestart);
+ private Set<FailoverRegion> getRegionsToRestart(FailoverRegion failedRegion) {
+ IdentityHashMap<FailoverRegion, Object> regionsToRestart = new IdentityHashMap<>();
+ IdentityHashMap<FailoverRegion, Object> visitedRegions = new IdentityHashMap<>();
+
+ // start from the failed region to visit all involved regions
+ Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
+ visitedRegions.put(failedRegion, null);
+ regionsToVisit.add(failedRegion);
+ while (!regionsToVisit.isEmpty()) {
+ FailoverRegion regionToRestart = regionsToVisit.poll();
+
+ // an involved region should be restarted
+ regionsToRestart.put(regionToRestart, null);
- // TODO: implement backtracking logic
+ // if a needed input result partition is not available, its producer region is involved
+ for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) {
+ for (FailoverEdge inEdge : vertex.getInputEdges()) {
+ if (!resultPartitionAvailabilityChecker.isAvailable(inEdge.getResultPartitionID())) {
+ FailoverRegion producerRegion = vertexToRegionMap.get(inEdge.getSourceVertex().getExecutionVertexID());
+ if (!visitedRegions.containsKey(producerRegion)) {
+ visitedRegions.put(producerRegion, null);
+ regionsToVisit.add(producerRegion);
+ }
+ }
+ }
+ }
+
+ // all consumer regions of an involved region should be involved
+ for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) {
+ for (FailoverEdge outEdge : vertex.getOutputEdges()) {
+ FailoverRegion consumerRegion = vertexToRegionMap.get(outEdge.getTargetVertex().getExecutionVertexID());
+ if (!visitedRegions.containsKey(consumerRegion)) {
+ visitedRegions.put(consumerRegion, null);
+ regionsToVisit.add(consumerRegion);
+ }
+ }
+ }
+ }
+
+ return regionsToRestart.keySet();
}
// ------------------------------------------------------------------------
@@ -208,7 +293,38 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
* @return the failover region that contains the given execution vertex
*/
@VisibleForTesting
- FailoverRegion getFailoverRegion(ExecutionVertexID vertexID) {
- return regions.get(vertexID);
+ public FailoverRegion getFailoverRegion(ExecutionVertexID vertexID) {
+ return vertexToRegionMap.get(vertexID);
+ }
+
+ /**
+ * A stateful {@link ResultPartitionAvailabilityChecker} which maintains the failed partitions which are not available.
+ */
+ private static class RegionFailoverResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {
+
+ /** Result partition state checker from the shuffle master. */
+ private final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;
+
+ /** Records partitions which has caused {@link PartitionException}. */
+ private final HashSet<IntermediateResultPartitionID> failedPartitions;
+
+ RegionFailoverResultPartitionAvailabilityChecker(ResultPartitionAvailabilityChecker checker) {
+ this.resultPartitionAvailabilityChecker = checkNotNull(checker);
+ this.failedPartitions = new HashSet<>();
+ }
+
+ @Override
+ public boolean isAvailable(IntermediateResultPartitionID resultPartitionID) {
+ return !failedPartitions.contains(resultPartitionID) &&
+ resultPartitionAvailabilityChecker.isAvailable(resultPartitionID);
+ }
+
+ public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) {
+ failedPartitions.add(resultPartitionID);
+ }
+
+ public void removeResultPartitionFromFailedState(IntermediateResultPartitionID resultPartitionID) {
+ failedPartitions.remove(resultPartitionID);
+ }
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
new file mode 100644
index 0000000..286ad3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+/**
+ * This checker helps to query result partition availability.
+ */
+interface ResultPartitionAvailabilityChecker {
+
+ /**
+ * Returns whether the given partition is available.
+ *
+ * @param resultPartitionID ID of the result partition to query
+ * @return whether the given partition is available
+ */
+ boolean isAvailable(IntermediateResultPartitionID resultPartitionID);
+}
+
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
index 9a3f595..5a9c844 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
@@ -18,10 +18,22 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
/**
@@ -30,37 +42,387 @@ import static org.junit.Assert.assertEquals;
public class RestartPipelinedRegionStrategyTest extends TestLogger {
/**
- * Tests for scenes that a task fails for its own error, in which case only the
- * region containing the failed task should be restarted.
+ * Tests for scenes that a task fails for its own error, in which case the
+ * region containing the failed task and its consumer regions should be restarted.
* <pre>
- * (v1)
+ * (v1) -+-> (v4)
+ * x
+ * (v2) -+-> (v5)
*
- * (v2)
+ * (v3) -+-> (v6)
*
- * (v3)
+ * ^
+ * |
+ * (blocking)
* </pre>
+ * Each vertex is in an individual region.
*/
@Test
- public void testRegionFailoverForTaskInternalErrors() throws Exception {
+ public void testRegionFailoverForRegionInternalErrors() throws Exception {
TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v4 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v5 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v6 = topologyBuilder.newVertex();
+
+ topologyBuilder.connect(v1, v4, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v1, v5, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v2, v4, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v2, v5, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v3, v6, ResultPartitionType.BLOCKING);
FailoverTopology topology = topologyBuilder.build();
RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
- FailoverRegion r1 = strategy.getFailoverRegion(v1.getExecutionVertexID());
- FailoverRegion r2 = strategy.getFailoverRegion(v2.getExecutionVertexID());
- FailoverRegion r3 = strategy.getFailoverRegion(v3.getExecutionVertexID());
-
- assertEquals(r1.getAllExecutionVertexIDs(),
+ // when v1 fails, {v1,v4,v5} should be restarted
+ HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
+ expectedResult.add(v1.getExecutionVertexID());
+ expectedResult.add(v4.getExecutionVertexID());
+ expectedResult.add(v5.getExecutionVertexID());
+ assertEquals(expectedResult,
strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")));
- assertEquals(r2.getAllExecutionVertexIDs(),
+
+ // when v2 fails, {v2,v4,v5} should be restarted
+ expectedResult.clear();
+ expectedResult.add(v2.getExecutionVertexID());
+ expectedResult.add(v4.getExecutionVertexID());
+ expectedResult.add(v5.getExecutionVertexID());
+ assertEquals(expectedResult,
strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")));
- assertEquals(r3.getAllExecutionVertexIDs(),
+
+ // when v3 fails, {v3,v6} should be restarted
+ expectedResult.clear();
+ expectedResult.add(v3.getExecutionVertexID());
+ expectedResult.add(v6.getExecutionVertexID());
+ assertEquals(expectedResult,
strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")));
+
+ // when v4 fails, {v4} should be restarted
+ expectedResult.clear();
+ expectedResult.add(v4.getExecutionVertexID());
+ assertEquals(expectedResult,
+ strategy.getTasksNeedingRestart(v4.getExecutionVertexID(), new Exception("Test failure")));
+
+ // when v5 fails, {v5} should be restarted
+ expectedResult.clear();
+ expectedResult.add(v5.getExecutionVertexID());
+ assertEquals(expectedResult,
+ strategy.getTasksNeedingRestart(v5.getExecutionVertexID(), new Exception("Test failure")));
+
+ // when v6 fails, {v6} should be restarted
+ expectedResult.clear();
+ expectedResult.add(v6.getExecutionVertexID());
+ assertEquals(expectedResult,
+ strategy.getTasksNeedingRestart(v6.getExecutionVertexID(), new Exception("Test failure")));
+ }
+
+ /**
+ * Tests for scenes that a task fails for data consumption error, in which case the
+ * region containing the failed task, the region containing the unavailable result partition
+ * and all their consumer regions should be restarted.
+ * <pre>
+ * (v1) -+-> (v4)
+ * x
+ * (v2) -+-> (v5)
+ *
+ * (v3) -+-> (v6)
+ *
+ * ^
+ * |
+ * (blocking)
+ * </pre>
+ * Each vertex is in an individual region.
+ */
+ @Test
+ public void testRegionFailoverForDataConsumptionErrors() throws Exception {
+ TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+ TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v4 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v5 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v6 = topologyBuilder.newVertex();
+
+ topologyBuilder.connect(v1, v4, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v1, v5, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v2, v4, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v2, v5, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v3, v6, ResultPartitionType.BLOCKING);
+
+ FailoverTopology topology = topologyBuilder.build();
+
+ RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+ // when v4 fails to consume data from v1, {v1,v4,v5} should be restarted
+ HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
+ Iterator<? extends FailoverEdge> v4InputEdgeIterator = v4.getInputEdges().iterator();
+ expectedResult.add(v1.getExecutionVertexID());
+ expectedResult.add(v4.getExecutionVertexID());
+ expectedResult.add(v5.getExecutionVertexID());
+ assertEquals(expectedResult,
+ strategy.getTasksNeedingRestart(v4.getExecutionVertexID(),
+ new PartitionConnectionException(
+ new ResultPartitionID(
+ v4InputEdgeIterator.next().getResultPartitionID(),
+ new ExecutionAttemptID()),
+ new Exception("Test failure"))));
+
+ // when v4 fails to consume data from v2, {v2,v4,v5} should be restarted
+ expectedResult.clear();
+ expectedResult.add(v2.getExecutionVertexID());
+ expectedResult.add(v4.getExecutionVertexID());
+ expectedResult.add(v5.getExecutionVertexID());
+ assertEquals(expectedResult,
+ strategy.getTasksNeedingRestart(v4.getExecutionVertexID(),
+ new PartitionNotFoundException(
+ new ResultPartitionID(
+ v4InputEdgeIterator.next().getResultPartitionID(),
+ new ExecutionAttemptID()))));
+
+ // when v5 fails to consume data from v1, {v1,v4,v5} should be restarted
+ expectedResult.clear();
+ Iterator<? extends FailoverEdge> v5InputEdgeIterator = v5.getInputEdges().iterator();
+ expectedResult.add(v1.getExecutionVertexID());
+ expectedResult.add(v4.getExecutionVertexID());
+ expectedResult.add(v5.getExecutionVertexID());
+ assertEquals(expectedResult,
+ strategy.getTasksNeedingRestart(v5.getExecutionVertexID(),
+ new PartitionConnectionException(
+ new ResultPartitionID(
+ v5InputEdgeIterator.next().getResultPartitionID(),
+ new ExecutionAttemptID()),
+ new Exception("Test failure"))));
+
+ // when v5 fails to consume data from v2, {v2,v4,v5} should be restarted
+ expectedResult.clear();
+ expectedResult.add(v2.getExecutionVertexID());
+ expectedResult.add(v4.getExecutionVertexID());
+ expectedResult.add(v5.getExecutionVertexID());
+ assertEquals(expectedResult,
+ strategy.getTasksNeedingRestart(v5.getExecutionVertexID(),
+ new PartitionNotFoundException(
+ new ResultPartitionID(
+ v5InputEdgeIterator.next().getResultPartitionID(),
+ new ExecutionAttemptID()))));
+
+ // when v6 fails to consume data from v3, {v3,v6} should be restarted
+ expectedResult.clear();
+ Iterator<? extends FailoverEdge> v6InputEdgeIterator = v6.getInputEdges().iterator();
+ expectedResult.add(v3.getExecutionVertexID());
+ expectedResult.add(v6.getExecutionVertexID());
+ assertEquals(expectedResult,
+ strategy.getTasksNeedingRestart(v6.getExecutionVertexID(),
+ new PartitionConnectionException(
+ new ResultPartitionID(
+ v6InputEdgeIterator.next().getResultPartitionID(),
+ new ExecutionAttemptID()),
+ new Exception("Test failure"))));
+ }
+
+ /**
+ * Tests to verify region failover results regarding different input result partition availability combinations.
+ * <pre>
+ * (v1) --rp1--\
+ * (v3)
+ * (v2) --rp2--/
+ *
+ * ^
+ * |
+ * (blocking)
+ * </pre>
+ * Each vertex is in an individual region.
+ * rp1, rp2 are result partitions.
+ */
+ @Test
+ public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations() throws Exception {
+ TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+ TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+
+ topologyBuilder.connect(v1, v3, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v2, v3, ResultPartitionType.BLOCKING);
+
+ FailoverTopology topology = topologyBuilder.build();
+
+ TestResultPartitionAvailabilityChecker availabilityChecker = new TestResultPartitionAvailabilityChecker();
+ RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology, availabilityChecker);
+
+ IntermediateResultPartitionID rp1ID = v1.getOutputEdges().iterator().next().getResultPartitionID();
+ IntermediateResultPartitionID rp2ID = v2.getOutputEdges().iterator().next().getResultPartitionID();
+
+ // -------------------------------------------------
+ // Combination1: (rp1 == available, rp == available)
+ // -------------------------------------------------
+ availabilityChecker.failedPartitions.clear();
+
+ // when v1 fails, {v1,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v1.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+ // when v2 fails, {v2,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+ // when v3 fails, {v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v3.getExecutionVertexID()));
+
+ // -------------------------------------------------
+ // Combination2: (rp1 == unavailable, rp == available)
+ // -------------------------------------------------
+ availabilityChecker.failedPartitions.clear();
+ availabilityChecker.markResultPartitionFailed(rp1ID);
+
+ // when v1 fails, {v1,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v1.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+ // when v2 fails, {v1,v2,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v1.getExecutionVertexID(), v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+ // when v3 fails, {v1,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v1.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+ // -------------------------------------------------
+ // Combination3: (rp1 == available, rp == unavailable)
+ // -------------------------------------------------
+ availabilityChecker.failedPartitions.clear();
+ availabilityChecker.markResultPartitionFailed(rp2ID);
+
+ // when v1 fails, {v1,v2,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v1.getExecutionVertexID(), v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+ // when v2 fails, {v2,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+ // when v3 fails, {v2,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+ // -------------------------------------------------
+ // Combination4: (rp1 == unavailable, rp == unavailable)
+ // -------------------------------------------------
+ availabilityChecker.failedPartitions.clear();
+ availabilityChecker.markResultPartitionFailed(rp1ID);
+ availabilityChecker.markResultPartitionFailed(rp2ID);
+
+ // when v1 fails, {v1,v2,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v1.getExecutionVertexID(), v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+ // when v2 fails, {v1,v2,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v1.getExecutionVertexID(), v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+ // when v3 fails, {v1,v2,v3} should be restarted
+ assertThat(
+ strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")),
+ containsInAnyOrder(v1.getExecutionVertexID(), v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+ }
+
+ /**
+ * Tests region failover scenes for topology with multiple vertices.
+ * <pre>
+ * (v1) ---> (v2) --|--> (v3) ---> (v4) --|--> (v5) ---> (v6)
+ *
+ * ^ ^ ^ ^ ^
+ * | | | | |
+ * (pipelined) (blocking) (pipelined) (blocking) (pipelined)
+ * </pre>
+ * Component 1: 1,2; component 2: 3,4; component 3: 5,6
+ */
+ @Test
+ public void testRegionFailoverForMultipleVerticesRegions() throws Exception {
+ TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+ TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v4 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v5 = topologyBuilder.newVertex();
+ TestFailoverTopology.TestFailoverVertex v6 = topologyBuilder.newVertex();
+
+ topologyBuilder.connect(v1, v2, ResultPartitionType.PIPELINED);
+ topologyBuilder.connect(v2, v3, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v3, v4, ResultPartitionType.PIPELINED);
+ topologyBuilder.connect(v4, v5, ResultPartitionType.BLOCKING);
+ topologyBuilder.connect(v5, v6, ResultPartitionType.PIPELINED);
+
+ FailoverTopology topology = topologyBuilder.build();
+
+ RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+ // when v3 fails due to internal error, {v3,v4,v5,v6} should be restarted
+ HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
+ expectedResult.add(v3.getExecutionVertexID());
+ expectedResult.add(v4.getExecutionVertexID());
+ expectedResult.add(v5.getExecutionVertexID());
+ expectedResult.add(v6.getExecutionVertexID());
+ assertEquals(expectedResult,
+ strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")));
+
+ // when v3 fails to consume from v2, {v1,v2,v3,v4,v5,v6} should be restarted
+ expectedResult.clear();
+ expectedResult.add(v1.getExecutionVertexID());
+ expectedResult.add(v2.getExecutionVertexID());
+ expectedResult.add(v3.getExecutionVertexID());
+ expectedResult.add(v4.getExecutionVertexID());
+ expectedResult.add(v5.getExecutionVertexID());
+ expectedResult.add(v6.getExecutionVertexID());
+ assertEquals(expectedResult,
+ strategy.getTasksNeedingRestart(v3.getExecutionVertexID(),
+ new PartitionConnectionException(
+ new ResultPartitionID(
+ v3.getInputEdges().iterator().next().getResultPartitionID(),
+ new ExecutionAttemptID()),
+ new Exception("Test failure"))));
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static class TestResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {
+
+ private final HashSet<IntermediateResultPartitionID> failedPartitions;
+
+ public TestResultPartitionAvailabilityChecker() {
+ this.failedPartitions = new HashSet<>();
+ }
+
+ @Override
+ public boolean isAvailable(IntermediateResultPartitionID resultPartitionID) {
+ return !failedPartitions.contains(resultPartitionID);
+ }
+
+ public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) {
+ failedPartitions.add(resultPartitionID);
+ }
+
+ public void removeResultPartitionFromFailedState(IntermediateResultPartitionID resultPartitionID) {
+ failedPartitions.remove(resultPartitionID);
+ }
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java
index 9daec8e..6b8b5fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java
@@ -130,9 +130,7 @@ public class TestFailoverTopology implements FailoverTopology {
private Collection<FailoverVertex> vertices = new ArrayList<>();
public TestFailoverVertex newVertex() {
- TestFailoverVertex testFailoverVertex = newVertex(UUID.randomUUID().toString());
- vertices.add(testFailoverVertex);
- return testFailoverVertex;
+ return newVertex(UUID.randomUUID().toString());
}
public TestFailoverVertex newVertex(String name) {
@@ -149,6 +147,14 @@ public class TestFailoverTopology implements FailoverTopology {
return this;
}
+ public Builder connect(TestFailoverVertex source, TestFailoverVertex target, ResultPartitionType partitionType, IntermediateResultPartitionID partitionID) {
+ FailoverEdge edge = new TestFailoverEdge(partitionID, partitionType, source, target);
+ source.addOuputEdge(edge);
+ target.addInputEdge(edge);
+
+ return this;
+ }
+
public Builder setContainsCoLocationConstraints(boolean containsCoLocationConstraints) {
this.containsCoLocationConstraints = containsCoLocationConstraints;
return this;