You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/06 21:29:15 UTC
[1/4] flink git commit: [FLINK-5867] [flip-1] Add tests for pipelined
failover region construction
Repository: flink
Updated Branches:
refs/heads/master b01d737ae -> e515b9bb2
[FLINK-5867] [flip-1] Add tests for pipelined failover region construction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/166a3f87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/166a3f87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/166a3f87
Branch: refs/heads/master
Commit: 166a3f877b8875dd4a3c2138f802241de7d9d2f8
Parents: 4eb9e46
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Thu Apr 20 23:56:53 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 21:40:04 2017 +0200
----------------------------------------------------------------------
.../PipelinedFailoverRegionBuildingTest.java | 644 +++++++++++++++++++
1 file changed, 644 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/166a3f87/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
new file mode 100644
index 0000000..55bf711
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
@@ -0,0 +1,644 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests that make sure that the building of pipelined connected failover regions works
+ * correctly.
+ */
+public class PipelinedFailoverRegionBuildingTest extends TestLogger {
+
+ /**
+ * Tests that validates that a graph with single unconnected vertices works correctly.
+ *
+ * <pre>
+ * (v1)
+ *
+ * (v2)
+ *
+ * (v3)
+ *
+ * ...
+ * </pre>
+ */
+ @Test
+ public void testIndividualVertices() throws Exception {
+ final JobVertex source1 = new JobVertex("source1");
+ source1.setInvokableClass(NoOpInvokable.class);
+ source1.setParallelism(2);
+
+ final JobVertex source2 = new JobVertex("source2");
+ source2.setInvokableClass(NoOpInvokable.class);
+ source2.setParallelism(2);
+
+ final JobGraph jobGraph = new JobGraph("test job", source1, source2);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+ FailoverRegion sourceRegion11 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source1.getID()).getTaskVertices()[0]);
+ FailoverRegion sourceRegion12 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source1.getID()).getTaskVertices()[1]);
+ FailoverRegion targetRegion21 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source2.getID()).getTaskVertices()[0]);
+ FailoverRegion targetRegion22 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source2.getID()).getTaskVertices()[1]);
+
+ assertTrue(sourceRegion11 != sourceRegion12);
+ assertTrue(sourceRegion12 != targetRegion21);
+ assertTrue(targetRegion21 != targetRegion22);
+ }
+
+ /**
+ * Tests that validates that embarrassingly parallel chains of vertices work correctly.
+ *
+ * <pre>
+ * (a1) --> (b1)
+ *
+ * (a2) --> (b2)
+ *
+ * (a3) --> (b3)
+ *
+ * ...
+ * </pre>
+ */
+ @Test
+ public void testEmbarrassinglyParallelCase() throws Exception {
+ int parallelism = 10000;
+ final JobVertex vertex1 = new JobVertex("vertex1");
+ vertex1.setInvokableClass(NoOpInvokable.class);
+ vertex1.setParallelism(parallelism);
+
+ final JobVertex vertex2 = new JobVertex("vertex2");
+ vertex2.setInvokableClass(NoOpInvokable.class);
+ vertex2.setParallelism(parallelism);
+
+ final JobVertex vertex3 = new JobVertex("vertex3");
+ vertex3.setInvokableClass(NoOpInvokable.class);
+ vertex3.setParallelism(parallelism);
+
+ vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+ FailoverRegion preRegion1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[0]);
+ FailoverRegion preRegion2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[0]);
+ FailoverRegion preRegion3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]);
+
+ assertTrue(preRegion1 == preRegion2);
+ assertTrue(preRegion2 == preRegion3);
+
+ for (int i = 1; i < parallelism; ++i) {
+ FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[i]);
+ FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[i]);
+ FailoverRegion region3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[i]);
+
+ assertTrue(region1 == region2);
+ assertTrue(region2 == region3);
+
+ assertTrue(preRegion1 != region1);
+ }
+ }
+
+ /**
+ * Tests that validates that a single pipelined component via a sequence of all-to-all
+ * connections works correctly.
+ *
+ * <pre>
+ * (a1) -+-> (b1) -+-> (c1)
+ * X X
+ * (a2) -+-> (b2) -+-> (c2)
+ * X X
+ * (a3) -+-> (b3) -+-> (c3)
+ *
+ * ...
+ * </pre>
+ */
+ @Test
+ public void testOneComponentViaTwoExchanges() throws Exception {
+ final JobVertex vertex1 = new JobVertex("vertex1");
+ vertex1.setInvokableClass(NoOpInvokable.class);
+ vertex1.setParallelism(3);
+
+ final JobVertex vertex2 = new JobVertex("vertex2");
+ vertex2.setInvokableClass(NoOpInvokable.class);
+ vertex2.setParallelism(5);
+
+ final JobVertex vertex3 = new JobVertex("vertex3");
+ vertex3.setInvokableClass(NoOpInvokable.class);
+ vertex3.setParallelism(2);
+
+ vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+ FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[1]);
+ FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[4]);
+ FailoverRegion region3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]);
+
+ assertTrue(region1 == region2);
+ assertTrue(region2 == region3);
+ }
+
+ /**
+ * Tests that validates that a single pipelined component via a cascade of joins
+ * works correctly.
+ *
+ * <p>Non-parallelized view:
+ * <pre>
+ * (1)--+
+ * +--(5)-+
+ * (2)--+ |
+ * +--(7)
+ * (3)--+ |
+ * +--(6)-+
+ * (4)--+
+ * ...
+ * </pre>
+ */
+ @Test
+ public void testOneComponentViaCascadeOfJoins() throws Exception {
+ final JobVertex vertex1 = new JobVertex("vertex1");
+ vertex1.setInvokableClass(NoOpInvokable.class);
+ vertex1.setParallelism(8);
+
+ final JobVertex vertex2 = new JobVertex("vertex2");
+ vertex2.setInvokableClass(NoOpInvokable.class);
+ vertex2.setParallelism(8);
+
+ final JobVertex vertex3 = new JobVertex("vertex3");
+ vertex3.setInvokableClass(NoOpInvokable.class);
+ vertex3.setParallelism(8);
+
+ final JobVertex vertex4 = new JobVertex("vertex4");
+ vertex4.setInvokableClass(NoOpInvokable.class);
+ vertex4.setParallelism(8);
+
+ final JobVertex vertex5 = new JobVertex("vertex5");
+ vertex5.setInvokableClass(NoOpInvokable.class);
+ vertex5.setParallelism(4);
+
+ final JobVertex vertex6 = new JobVertex("vertex6");
+ vertex6.setInvokableClass(NoOpInvokable.class);
+ vertex6.setParallelism(4);
+
+ final JobVertex vertex7 = new JobVertex("vertex7");
+ vertex7.setInvokableClass(NoOpInvokable.class);
+ vertex7.setParallelism(2);
+
+ vertex5.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex5.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ vertex6.connectNewDataSetAsInput(vertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex6.connectNewDataSetAsInput(vertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ vertex7.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex7.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3, vertex4, vertex5, vertex6, vertex7);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+
+ Iterator<ExecutionVertex> evs = eg.getAllExecutionVertices().iterator();
+
+ FailoverRegion preRegion = failoverStrategy.getFailoverRegion(evs.next());
+
+ while (evs.hasNext()) {
+ FailoverRegion region = failoverStrategy.getFailoverRegion(evs.next());
+ assertTrue(preRegion == region);
+ }
+ }
+
+ /**
+ * Tests that validates that a single pipelined component instance from one source
+ * works correctly.
+ *
+ * <p>Non-parallelized view:
+ * <pre>
+ * +--(1)
+ * +--(5)-+
+ * | +--(2)
+ * (7)--+
+ * | +--(3)
+ * +--(6)-+
+ * +--(4)
+ * ...
+ * </pre>
+ */
+ @Test
+ public void testOneComponentInstanceFromOneSource() throws Exception {
+ final JobVertex vertex1 = new JobVertex("vertex1");
+ vertex1.setInvokableClass(NoOpInvokable.class);
+ vertex1.setParallelism(8);
+
+ final JobVertex vertex2 = new JobVertex("vertex2");
+ vertex2.setInvokableClass(NoOpInvokable.class);
+ vertex2.setParallelism(8);
+
+ final JobVertex vertex3 = new JobVertex("vertex3");
+ vertex3.setInvokableClass(NoOpInvokable.class);
+ vertex3.setParallelism(8);
+
+ final JobVertex vertex4 = new JobVertex("vertex4");
+ vertex4.setInvokableClass(NoOpInvokable.class);
+ vertex4.setParallelism(8);
+
+ final JobVertex vertex5 = new JobVertex("vertex5");
+ vertex5.setInvokableClass(NoOpInvokable.class);
+ vertex5.setParallelism(4);
+
+ final JobVertex vertex6 = new JobVertex("vertex6");
+ vertex6.setInvokableClass(NoOpInvokable.class);
+ vertex6.setParallelism(4);
+
+ final JobVertex vertex7 = new JobVertex("vertex7");
+ vertex7.setInvokableClass(NoOpInvokable.class);
+ vertex7.setParallelism(2);
+
+ vertex1.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex2.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ vertex3.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex4.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ vertex5.connectNewDataSetAsInput(vertex7, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex6.connectNewDataSetAsInput(vertex7, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ final JobGraph jobGraph = new JobGraph("test job", vertex7, vertex5, vertex6, vertex1, vertex2, vertex3, vertex4);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+
+ Iterator<ExecutionVertex> evs = eg.getAllExecutionVertices().iterator();
+
+ FailoverRegion preRegion = failoverStrategy.getFailoverRegion(evs.next());
+
+ while (evs.hasNext()) {
+ FailoverRegion region = failoverStrategy.getFailoverRegion(evs.next());
+ assertTrue(preRegion == region);
+ }
+ }
+
+ /**
+ * <pre>
+ * (a1) -+-> (b1) -+-> (c1)
+ * X
+ * (a2) -+-> (b2) -+-> (c2)
+ * X
+ * (a3) -+-> (b3) -+-> (c3)
+ *
+ * ^ ^
+ * | |
+ * (pipelined) (blocking)
+ *
+ * </pre>
+ */
+ @Test
+ public void testTwoComponentsViaBlockingExchange() throws Exception {
+ final JobVertex vertex1 = new JobVertex("vertex1");
+ vertex1.setInvokableClass(NoOpInvokable.class);
+ vertex1.setParallelism(3);
+
+ final JobVertex vertex2 = new JobVertex("vertex2");
+ vertex2.setInvokableClass(NoOpInvokable.class);
+ vertex2.setParallelism(2);
+
+ final JobVertex vertex3 = new JobVertex("vertex3");
+ vertex3.setInvokableClass(NoOpInvokable.class);
+ vertex3.setParallelism(2);
+
+ vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+ final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+ FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[1]);
+ FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[0]);
+ FailoverRegion region31 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]);
+ FailoverRegion region32 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[1]);
+
+ assertTrue(region1 == region2);
+ assertTrue(region2 != region31);
+ assertTrue(region32 != region31);
+ }
+
+ /**
+ * <pre>
+ * (a1) -+-> (b1) -+-> (c1)
+ * X X
+ * (a2) -+-> (b2) -+-> (c2)
+ * X X
+ * (a3) -+-> (b3) -+-> (c3)
+ *
+ * ^ ^
+ * | |
+ * (pipelined) (blocking)
+ * </pre>
+ */
+ @Test
+ public void testTwoComponentsViaBlockingExchange2() throws Exception {
+ final JobVertex vertex1 = new JobVertex("vertex1");
+ vertex1.setInvokableClass(NoOpInvokable.class);
+ vertex1.setParallelism(3);
+
+ final JobVertex vertex2 = new JobVertex("vertex2");
+ vertex2.setInvokableClass(NoOpInvokable.class);
+ vertex2.setParallelism(2);
+
+ final JobVertex vertex3 = new JobVertex("vertex3");
+ vertex3.setInvokableClass(NoOpInvokable.class);
+ vertex3.setParallelism(2);
+
+ vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+ final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+ FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[1]);
+ FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[0]);
+ FailoverRegion region31 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]);
+ FailoverRegion region32 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[1]);
+
+ assertTrue(region1 == region2);
+ assertTrue(region2 != region31);
+ assertTrue(region32 != region31);
+ }
+
+ /**
+ * Cascades of joins with partially blocking, partially pipelined exchanges:
+ * <pre>
+ * (1)--+
+ * +--(5)-+
+ * (2)--+ |
+ * (block)
+ * |
+ * +--(7)
+ * |
+ * (block)
+ * (3)--+ |
+ * +--(6)-+
+ * (4)--+
+ * ...
+ * </pre>
+ *
+ * Component 1: 1, 2, 5; component 2: 3,4,6; component 3: 7
+ */
+ @Test
+ public void testMultipleComponentsViaCascadeOfJoins() throws Exception {
+ final JobVertex vertex1 = new JobVertex("vertex1");
+ vertex1.setInvokableClass(NoOpInvokable.class);
+ vertex1.setParallelism(8);
+
+ final JobVertex vertex2 = new JobVertex("vertex2");
+ vertex2.setInvokableClass(NoOpInvokable.class);
+ vertex2.setParallelism(8);
+
+ final JobVertex vertex3 = new JobVertex("vertex3");
+ vertex3.setInvokableClass(NoOpInvokable.class);
+ vertex3.setParallelism(8);
+
+ final JobVertex vertex4 = new JobVertex("vertex4");
+ vertex4.setInvokableClass(NoOpInvokable.class);
+ vertex4.setParallelism(8);
+
+ final JobVertex vertex5 = new JobVertex("vertex5");
+ vertex5.setInvokableClass(NoOpInvokable.class);
+ vertex5.setParallelism(4);
+
+ final JobVertex vertex6 = new JobVertex("vertex6");
+ vertex6.setInvokableClass(NoOpInvokable.class);
+ vertex6.setParallelism(4);
+
+ final JobVertex vertex7 = new JobVertex("vertex7");
+ vertex7.setInvokableClass(NoOpInvokable.class);
+ vertex7.setParallelism(2);
+
+ vertex5.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex5.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ vertex6.connectNewDataSetAsInput(vertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex6.connectNewDataSetAsInput(vertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ vertex7.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+ vertex7.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+ final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3, vertex4, vertex5, vertex6, vertex7);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+
+ FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[0]);
+ FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[5]);
+ FailoverRegion region5 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex5.getID()).getTaskVertices()[2]);
+
+ assertTrue(region1 == region2);
+ assertTrue(region1 == region5);
+
+ FailoverRegion region3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]);
+ FailoverRegion region4 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex4.getID()).getTaskVertices()[5]);
+ FailoverRegion region6 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex6.getID()).getTaskVertices()[2]);
+
+ assertTrue(region3 == region4);
+ assertTrue(region3 == region6);
+
+ FailoverRegion region71 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex7.getID()).getTaskVertices()[0]);
+ FailoverRegion region72 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex7.getID()).getTaskVertices()[1]);
+
+ assertTrue(region71 != region72);
+ assertTrue(region1 != region71);
+ assertTrue(region1 != region72);
+ assertTrue(region3 != region71);
+ assertTrue(region3 != region72);
+ }
+
+ @Test
+ public void testDiamondWithMixedPipelinedAndBlockingExchanges() throws Exception {
+ final JobVertex vertex1 = new JobVertex("vertex1");
+ vertex1.setInvokableClass(NoOpInvokable.class);
+ vertex1.setParallelism(8);
+
+ final JobVertex vertex2 = new JobVertex("vertex2");
+ vertex2.setInvokableClass(NoOpInvokable.class);
+ vertex2.setParallelism(8);
+
+ final JobVertex vertex3 = new JobVertex("vertex3");
+ vertex3.setInvokableClass(NoOpInvokable.class);
+ vertex3.setParallelism(8);
+
+ final JobVertex vertex4 = new JobVertex("vertex4");
+ vertex4.setInvokableClass(NoOpInvokable.class);
+ vertex4.setParallelism(8);
+
+ vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+ vertex3.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ vertex4.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ vertex4.connectNewDataSetAsInput(vertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3, vertex4);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+
+ Iterator<ExecutionVertex> evs = eg.getAllExecutionVertices().iterator();
+
+ FailoverRegion preRegion = failoverStrategy.getFailoverRegion(evs.next());
+
+ while (evs.hasNext()) {
+ FailoverRegion region = failoverStrategy.getFailoverRegion(evs.next());
+ assertTrue(preRegion == region);
+ }
+ }
+
+ /**
+ * This test checks that are strictly co-located vertices are in the same failover region,
+ * even through they are connected via a blocking pattern.
+ * This is currently an assumption / limitation of the scheduler.
+ */
+ @Test
+ public void testBlockingAllToAllTopologyWithCoLocation() throws Exception {
+ final JobVertex source = new JobVertex("source");
+ source.setInvokableClass(NoOpInvokable.class);
+ source.setParallelism(10);
+
+ final JobVertex target = new JobVertex("target");
+ target.setInvokableClass(NoOpInvokable.class);
+ target.setParallelism(13);
+
+ target.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+ final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+ source.setSlotSharingGroup(sharingGroup);
+ target.setSlotSharingGroup(sharingGroup);
+
+ source.setStrictlyCoLocatedWith(target);
+
+ final JobGraph jobGraph = new JobGraph("test job", source, target);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+ FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source.getID()).getTaskVertices()[0]);
+ FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(target.getID()).getTaskVertices()[0]);
+
+ // we use 'assertTrue' here rather than 'assertEquals' because we want to test
+ // for referential equality, to be on the safe side
+ assertTrue(region1 == region2);
+ }
+
+ /**
+ * This test checks that are strictly co-located vertices are in the same failover region,
+ * even through they are connected via a blocking pattern.
+ * This is currently an assumption / limitation of the scheduler.
+ */
+ @Test
+ public void testPipelinedOneToOneTopologyWithCoLocation() throws Exception {
+ final JobVertex source = new JobVertex("source");
+ source.setInvokableClass(NoOpInvokable.class);
+ source.setParallelism(10);
+
+ final JobVertex target = new JobVertex("target");
+ target.setInvokableClass(NoOpInvokable.class);
+ target.setParallelism(10);
+
+ target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+ source.setSlotSharingGroup(sharingGroup);
+ target.setSlotSharingGroup(sharingGroup);
+
+ source.setStrictlyCoLocatedWith(target);
+
+ final JobGraph jobGraph = new JobGraph("test job", source, target);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
+ FailoverRegion sourceRegion1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source.getID()).getTaskVertices()[0]);
+ FailoverRegion sourceRegion2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source.getID()).getTaskVertices()[1]);
+ FailoverRegion targetRegion1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(target.getID()).getTaskVertices()[0]);
+ FailoverRegion targetRegion2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(target.getID()).getTaskVertices()[1]);
+
+ // we use 'assertTrue' here rather than 'assertEquals' because we want to test
+ // for referential equality, to be on the safe side
+ assertTrue(sourceRegion1 == sourceRegion2);
+ assertTrue(sourceRegion2 == targetRegion1);
+ assertTrue(targetRegion1 == targetRegion2);
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws JobException, JobExecutionException {
+ // configure the pipelined failover strategy
+ final Configuration jobManagerConfig = new Configuration();
+ jobManagerConfig.setString(
+ JobManagerOptions.EXECUTION_FAILOVER_STRATEGY,
+ FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME);
+
+ return ExecutionGraphBuilder.buildGraph(
+ null,
+ jobGraph,
+ jobManagerConfig,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ mock(SlotProvider.class),
+ PipelinedFailoverRegionBuildingTest.class.getClassLoader(),
+ new StandaloneCheckpointRecoveryFactory(),
+ Time.seconds(10),
+ new NoRestartStrategy(),
+ new UnregisteredMetricsGroup(),
+ 1000,
+ log);
+ }
+}
[3/4] flink git commit: [FLINK-5867] [flip-1] Improve performance of
Pipelined Failover Region construction
Posted by se...@apache.org.
[FLINK-5867] [flip-1] Improve performance of Pipelined Failover Region construction
This method exploits the fact that verties are already in topological order.
This closes #3773
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcfd37ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcfd37ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcfd37ca
Branch: refs/heads/master
Commit: dcfd37ca69380de5afbf28ad946dde90ab0de722
Parents: 166a3f8
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Apr 21 00:02:19 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 21:40:05 2017 +0200
----------------------------------------------------------------------
.../RestartPipelinedRegionStrategy.java | 170 +++++++++++++------
1 file changed, 115 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dcfd37ca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
index 0a5baa8..1884d1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
@@ -24,14 +24,16 @@ import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.Executor;
@@ -91,16 +93,20 @@ public class RestartPipelinedRegionStrategy extends FailoverStrategy {
if (failoverRegion == null) {
executionGraph.failGlobal(new FlinkException(
- "Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex()));
+ "Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex(), cause));
}
else {
+ LOG.info("Recovering task failure for {} #{} ({}) via restart of failover region",
+ taskExecution.getVertex().getTaskNameWithSubtaskIndex(),
+ taskExecution.getAttemptNumber(),
+ taskExecution.getAttemptId());
+
failoverRegion.onExecutionFail(taskExecution, cause);
}
}
@Override
public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
- LOG.debug("Generating failover regions for {} new job vertices", newJobVerticesTopological.size());
generateAllFailoverRegion(newJobVerticesTopological);
}
@@ -109,74 +115,128 @@ public class RestartPipelinedRegionStrategy extends FailoverStrategy {
return "Pipelined Region Failover";
}
- // Generate all the FailoverRegion from the new added job vertexes
+ /**
+ * Generate all the FailoverRegion from the new added job vertexes
+ */
private void generateAllFailoverRegion(List<ExecutionJobVertex> newJobVerticesTopological) {
+ final IdentityHashMap<ExecutionVertex, ArrayList<ExecutionVertex>> vertexToRegion = new IdentityHashMap<>();
+
+ // we use the map (list -> null) to imitate an IdentityHashSet (which does not exist)
+ final IdentityHashMap<ArrayList<ExecutionVertex>, Object> distinctRegions = new IdentityHashMap<>();
+
+ // this loop will worst case iterate over every edge in the graph (complexity is O(#edges))
+
for (ExecutionJobVertex ejv : newJobVerticesTopological) {
- for (ExecutionVertex ev : ejv.getTaskVertices()) {
- if (getFailoverRegion(ev) != null) {
- continue;
- }
- List<ExecutionVertex> pipelinedExecutions = new ArrayList<>();
- List<ExecutionVertex> orgExecutions = new ArrayList<>();
- orgExecutions.add(ev);
- pipelinedExecutions.add(ev);
- getAllPipelinedConnectedVertexes(orgExecutions, pipelinedExecutions);
-
- FailoverRegion region = new FailoverRegion(executionGraph, executor, pipelinedExecutions);
- for (ExecutionVertex vertex : pipelinedExecutions) {
- vertexToRegion.put(vertex, region);
+
+ // currently, jobs with a co-location constraint fail as one
+ // we want to improve that in the future (or get rid of co-location constraints)
+ if (ejv.getCoLocationGroup() != null) {
+ makeAllOneRegion(newJobVerticesTopological);
+ return;
+ }
+
+ // see if this JobVertex one has pipelined inputs at all
+ final List<IntermediateResult> inputs = ejv.getInputs();
+ final int numInputs = inputs.size();
+ boolean hasPipelinedInputs = false;
+
+ for (IntermediateResult input : inputs) {
+ if (input.getResultType().isPipelined()) {
+ hasPipelinedInputs = true;
+ break;
}
}
- }
- }
- /**
- * Get all connected executions of the original executions
- *
- * @param orgExecutions the original execution vertexes
- * @param connectedExecutions the total connected executions
- */
- private static void getAllPipelinedConnectedVertexes(List<ExecutionVertex> orgExecutions, List<ExecutionVertex> connectedExecutions) {
- List<ExecutionVertex> newAddedExecutions = new ArrayList<>();
- for (ExecutionVertex ev : orgExecutions) {
- // Add downstream ExecutionVertex
- for (IntermediateResultPartition irp : ev.getProducedPartitions().values()) {
- if (irp.getIntermediateResult().getResultType().isPipelined()) {
- for (List<ExecutionEdge> consumers : irp.getConsumers()) {
- for (ExecutionEdge consumer : consumers) {
- ExecutionVertex cev = consumer.getTarget();
- if (!connectedExecutions.contains(cev)) {
- newAddedExecutions.add(cev);
+ if (hasPipelinedInputs) {
+ // build upon the predecessors
+ for (ExecutionVertex ev : ejv.getTaskVertices()) {
+
+ // remember the region in which we are
+ ArrayList<ExecutionVertex> thisRegion = null;
+
+ for (int inputNum = 0; inputNum < numInputs; inputNum++) {
+ if (inputs.get(inputNum).getResultType().isPipelined()) {
+
+ for (ExecutionEdge edge : ev.getInputEdges(inputNum)) {
+ final ExecutionVertex predecessor = edge.getSource().getProducer();
+ final ArrayList<ExecutionVertex> predecessorRegion = vertexToRegion.get(predecessor);
+
+ if (thisRegion != null) {
+ // we already have a region. see if it is the same as the predecessor's region
+ if (predecessorRegion != thisRegion) {
+
+ // we need to merge our region and the predecessor's region
+ thisRegion.addAll(predecessorRegion);
+ distinctRegions.remove(predecessorRegion);
+
+ // remap the vertices from that merged region
+ for (ExecutionVertex inPredRegion: predecessorRegion) {
+ vertexToRegion.put(inPredRegion, thisRegion);
+ }
+ }
+ }
+ else if (predecessor != null) {
+ // first case, make this our region
+ thisRegion = predecessorRegion;
+ thisRegion.add(ev);
+ vertexToRegion.put(ev, thisRegion);
+ }
+ else {
+ // throw an uncaught exception here
+ // this is a bug and not a recoverable situation
+ throw new FlinkRuntimeException(
+ "bug in the logic to construct the pipelined failover regions");
+ }
}
}
}
}
}
- if (!newAddedExecutions.isEmpty()) {
- connectedExecutions.addAll(newAddedExecutions);
- getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions);
- newAddedExecutions.clear();
- }
- // Add upstream ExecutionVertex
- int inputNum = ev.getNumberOfInputs();
- for (int i = 0; i < inputNum; i++) {
- for (ExecutionEdge input : ev.getInputEdges(i)) {
- if (input.getSource().getIntermediateResult().getResultType().isPipelined()) {
- ExecutionVertex pev = input.getSource().getProducer();
- if (!connectedExecutions.contains(pev)) {
- newAddedExecutions.add(pev);
- }
- }
+ else {
+ // no pipelined inputs, start a new region
+ for (ExecutionVertex ev : ejv.getTaskVertices()) {
+ ArrayList<ExecutionVertex> region = new ArrayList<>(1);
+ region.add(ev);
+ vertexToRegion.put(ev, region);
+ distinctRegions.put(region, null);
}
}
- if (!newAddedExecutions.isEmpty()) {
- connectedExecutions.addAll(0, newAddedExecutions);
- getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions);
- newAddedExecutions.clear();
+ }
+
+ // now that we have all regions, create the failover region objects
+ LOG.info("Creating {} individual failover regions for job {} ({})",
+ executionGraph.getJobName(), executionGraph.getJobID());
+
+ for (List<ExecutionVertex> region : distinctRegions.keySet()) {
+ final FailoverRegion failoverRegion = new FailoverRegion(executionGraph, executor, region);
+ for (ExecutionVertex ev : region) {
+ this.vertexToRegion.put(ev, failoverRegion);
}
}
}
+ private void makeAllOneRegion(List<ExecutionJobVertex> jobVertices) {
+ LOG.warn("Cannot decompose ExecutionGraph into individual failover regions due to use of " +
+ "Co-Location constraints (iterations). Job will fail over as one holistic unit.");
+
+ final ArrayList<ExecutionVertex> allVertices = new ArrayList<>();
+
+ for (ExecutionJobVertex ejv : jobVertices) {
+
+ // safe some incremental size growing
+ allVertices.ensureCapacity(allVertices.size() + ejv.getParallelism());
+
+ for (ExecutionVertex ev : ejv.getTaskVertices()) {
+ allVertices.add(ev);
+ }
+ }
+
+ final FailoverRegion singleRegion = new FailoverRegion(executionGraph, executor, allVertices);
+ for (ExecutionVertex ev : allVertices) {
+ vertexToRegion.put(ev, singleRegion);
+ }
+ }
+
// ------------------------------------------------------------------------
// testing
// ------------------------------------------------------------------------
[2/4] flink git commit: [FLINK-5867] [flip-1] Support restarting only
pipelined sub-regions of the ExecutionGraph on task failure
Posted by se...@apache.org.
[FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the ExecutionGraph on task failure
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4eb9e46e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4eb9e46e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4eb9e46e
Branch: refs/heads/master
Commit: 4eb9e46ee31e3f003c1a92322d13056cb4d4cfd5
Parents: b01d737
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Tue Apr 18 14:15:29 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 21:40:04 2017 +0200
----------------------------------------------------------------------
.../executiongraph/failover/FailoverRegion.java | 251 ++++++++++
.../failover/FailoverStrategyLoader.java | 8 +-
.../RestartPipelinedRegionStrategy.java | 206 ++++++++
.../executiongraph/ExecutionGraphTestUtils.java | 38 +-
.../executiongraph/FailoverRegionTest.java | 490 +++++++++++++++++++
.../PipelinedRegionFailoverConcurrencyTest.java | 353 +++++++++++++
.../RestartPipelinedRegionStrategyTest.java | 396 +++++++++++++++
7 files changed, 1731 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
new file mode 100644
index 0000000..b36cfcf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * FailoverRegion manages the failover of a minimal pipeline connected sub graph.
+ * It will change from CREATED to CANCELING and then to CANCELLED and at last to RUNNING,
+ */
+public class FailoverRegion {
+
+ private static final AtomicReferenceFieldUpdater<FailoverRegion, JobStatus> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state");
+
+ /** The log object used for debugging. */
+ private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class);
+
+ // ------------------------------------------------------------------------
+
+ /** a unique id for debugging */
+ private final AbstractID id = new AbstractID();
+
+ private final ExecutionGraph executionGraph;
+
+ private final List<ExecutionVertex> connectedExecutionVertexes;
+
+ /** The executor that executes the recovery action after all vertices are in a */
+ private final Executor executor;
+
+ /** Current status of the job execution */
+ private volatile JobStatus state = JobStatus.RUNNING;
+
+
+ public FailoverRegion(ExecutionGraph executionGraph, Executor executor, List<ExecutionVertex> connectedExecutions) {
+ this.executionGraph = checkNotNull(executionGraph);
+ this.executor = checkNotNull(executor);
+ this.connectedExecutionVertexes = checkNotNull(connectedExecutions);
+
+ LOG.debug("Created failover region {} with vertices: {}", id, connectedExecutions);
+ }
+
+ public void onExecutionFail(Execution taskExecution, Throwable cause) {
+ // TODO: check if need to failover the preceding region
+ if (!executionGraph.getRestartStrategy().canRestart()) {
+ // delegate the failure to a global fail that will check the restart strategy and not restart
+ executionGraph.failGlobal(cause);
+ }
+ else {
+ cancel(taskExecution.getGlobalModVersion());
+ }
+ }
+
+ private void allVerticesInTerminalState(long globalModVersionOfFailover) {
+ while (true) {
+ JobStatus curStatus = this.state;
+ if (curStatus.equals(JobStatus.CANCELLING)) {
+ if (transitionState(curStatus, JobStatus.CANCELED)) {
+ reset(globalModVersionOfFailover);
+ break;
+ }
+ }
+ else {
+ LOG.info("FailoverRegion {} is {} when allVerticesInTerminalState.", id, state);
+ break;
+ }
+ }
+ }
+
+ public JobStatus getState() {
+ return state;
+ }
+
+ /**
+ * get all execution vertexes contained in this region
+ */
+ public List<ExecutionVertex> getAllExecutionVertexes() {
+ return connectedExecutionVertexes;
+ }
+
+ // Notice the region to failover,
+ private void failover(long globalModVersionOfFailover) {
+ if (!executionGraph.getRestartStrategy().canRestart()) {
+ executionGraph.failGlobal(new FlinkException("RestartStrategy validate fail"));
+ }
+ else {
+ JobStatus curStatus = this.state;
+ if (curStatus.equals(JobStatus.RUNNING)) {
+ cancel(globalModVersionOfFailover);
+ }
+ else if (curStatus.equals(JobStatus.CANCELED)) {
+ reset(globalModVersionOfFailover);
+ }
+ else {
+ LOG.info("FailoverRegion {} is {} when notified to failover.", id, state);
+ }
+ }
+ }
+
+ // cancel all executions in this sub graph
+ private void cancel(final long globalModVersionOfFailover) {
+ while (true) {
+ JobStatus curStatus = this.state;
+ if (curStatus.equals(JobStatus.RUNNING)) {
+ if (transitionState(curStatus, JobStatus.CANCELLING)) {
+
+ // we build a future that is complete once all vertices have reached a terminal state
+ final ArrayList<Future<?>> futures = new ArrayList<>(connectedExecutionVertexes.size());
+
+ // cancel all tasks (that still need cancelling)
+ for (ExecutionVertex vertex : connectedExecutionVertexes) {
+ futures.add(vertex.cancel());
+ }
+
+ final FutureUtils.ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+ allTerminal.thenAcceptAsync(new AcceptFunction<Void>() {
+ @Override
+ public void accept(Void value) {
+ allVerticesInTerminalState(globalModVersionOfFailover);
+ }
+ }, executor);
+
+ break;
+ }
+ }
+ else {
+ LOG.info("FailoverRegion {} is {} when cancel.", id, state);
+ break;
+ }
+ }
+ }
+
+ // reset all executions in this sub graph
+ private void reset(long globalModVersionOfFailover) {
+ try {
+ // reset all connected ExecutionVertexes
+ final Collection<CoLocationGroup> colGroups = new HashSet<>();
+ final long restartTimestamp = System.currentTimeMillis();
+
+ for (ExecutionVertex ev : connectedExecutionVertexes) {
+ CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
+ if (cgroup != null && !colGroups.contains(cgroup)){
+ cgroup.resetConstraints();
+ colGroups.add(cgroup);
+ }
+
+ ev.resetForNewExecution(restartTimestamp, globalModVersionOfFailover);
+ }
+ if (transitionState(JobStatus.CANCELED, JobStatus.CREATED)) {
+ restart(globalModVersionOfFailover);
+ }
+ else {
+ LOG.info("FailoverRegion {} switched from CANCELLING to CREATED fail, will fail this region again.", id);
+ failover(globalModVersionOfFailover);
+ }
+ }
+ catch (GlobalModVersionMismatch e) {
+ // happens when a global recovery happens concurrently to the regional recovery
+ // go back to a clean state
+ state = JobStatus.RUNNING;
+ }
+ catch (Throwable e) {
+ LOG.info("FailoverRegion {} reset fail, will failover again.", id);
+ failover(globalModVersionOfFailover);
+ }
+ }
+
+ // restart all executions in this sub graph
+ private void restart(long globalModVersionOfFailover) {
+ try {
+ if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
+ // if we have checkpointed state, reload it into the executions
+ //TODO: checkpoint support restore part ExecutionVertex cp
+ /**
+ if (executionGraph.getCheckpointCoordinator() != null) {
+ executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
+ connectedExecutionVertexes, false, false);
+ }
+ */
+ //TODO, use restart strategy to schedule them.
+ //restart all connected ExecutionVertexes
+ for (ExecutionVertex ev : connectedExecutionVertexes) {
+ try {
+ ev.scheduleForExecution(
+ executionGraph.getSlotProvider(),
+ executionGraph.isQueuedSchedulingAllowed());
+ }
+ catch (Throwable e) {
+ failover(globalModVersionOfFailover);
+ }
+ }
+ }
+ else {
+ LOG.info("FailoverRegion {} switched from CREATED to RUNNING fail, will fail this region again.", id);
+ failover(globalModVersionOfFailover);
+ }
+ } catch (Exception e) {
+ LOG.info("FailoverRegion {} restart failed, failover again.", id, e);
+ failover(globalModVersionOfFailover);
+ }
+ }
+
+ private boolean transitionState(JobStatus current, JobStatus newState) {
+ if (STATE_UPDATER.compareAndSet(this, current, newState)) {
+ LOG.info("FailoverRegion {} switched from state {} to {}.", id, current, newState);
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
index f18a90f..8b6fa6e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
@@ -35,9 +35,12 @@ public class FailoverStrategyLoader {
/** Config name for the {@link RestartAllStrategy} */
public static final String FULL_RESTART_STRATEGY_NAME = "full";
- /** Config name for the strategy that restarts individual tasks */
+ /** Config name for the {@link RestartIndividualStrategy} */
public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual";
+ /** Config name for the {@link RestartPipelinedRegionStrategy} */
+ public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region";
+
// ------------------------------------------------------------------------
/**
@@ -59,6 +62,9 @@ public class FailoverStrategyLoader {
case FULL_RESTART_STRATEGY_NAME:
return new RestartAllStrategy.Factory();
+ case PIPELINED_REGION_RESTART_STRATEGY_NAME:
+ return new RestartPipelinedRegionStrategy.Factory();
+
case INDIVIDUAL_RESTART_STRATEGY_NAME:
return new RestartIndividualStrategy.Factory();
http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
new file mode 100644
index 0000000..0a5baa8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A failover strategy that restarts regions of the ExecutionGraph. A region is defined
+ * by this strategy as the weakly connected component of tasks that communicate via pipelined
+ * data exchange.
+ */
+public class RestartPipelinedRegionStrategy extends FailoverStrategy {
+
+ /** The log object used for debugging. */
+ private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class);
+
+ /** The execution graph on which this FailoverStrategy works */
+ private final ExecutionGraph executionGraph;
+
+ /** The executor used for future actions */
+ private final Executor executor;
+
+ /** Fast lookup from vertex to failover region */
+ private final HashMap<ExecutionVertex, FailoverRegion> vertexToRegion;
+
+
+ /**
+ * Creates a new failover strategy to restart pipelined regions that works on the given
+ * execution graph and uses the execution graph's future executor to call restart actions.
+ *
+ * @param executionGraph The execution graph on which this FailoverStrategy will work
+ */
+ public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph) {
+ this(executionGraph, executionGraph.getFutureExecutor());
+ }
+
+ /**
+ * Creates a new failover strategy to restart pipelined regions that works on the given
+ * execution graph and uses the given executor to call restart actions.
+ *
+ * @param executionGraph The execution graph on which this FailoverStrategy will work
+ * @param executor The executor used for future actions
+ */
+ public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph, Executor executor) {
+ this.executionGraph = checkNotNull(executionGraph);
+ this.executor = checkNotNull(executor);
+ this.vertexToRegion = new HashMap<>();
+ }
+
+ // ------------------------------------------------------------------------
+ // failover implementation
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void onTaskFailure(Execution taskExecution, Throwable cause) {
+ final ExecutionVertex ev = taskExecution.getVertex();
+ final FailoverRegion failoverRegion = vertexToRegion.get(ev);
+
+ if (failoverRegion == null) {
+ executionGraph.failGlobal(new FlinkException(
+ "Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex()));
+ }
+ else {
+ failoverRegion.onExecutionFail(taskExecution, cause);
+ }
+ }
+
+ @Override
+ public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
+ LOG.debug("Generating failover regions for {} new job vertices", newJobVerticesTopological.size());
+ generateAllFailoverRegion(newJobVerticesTopological);
+ }
+
+ @Override
+ public String getStrategyName() {
+ return "Pipelined Region Failover";
+ }
+
+ // Generate all the FailoverRegion from the new added job vertexes
+ private void generateAllFailoverRegion(List<ExecutionJobVertex> newJobVerticesTopological) {
+ for (ExecutionJobVertex ejv : newJobVerticesTopological) {
+ for (ExecutionVertex ev : ejv.getTaskVertices()) {
+ if (getFailoverRegion(ev) != null) {
+ continue;
+ }
+ List<ExecutionVertex> pipelinedExecutions = new ArrayList<>();
+ List<ExecutionVertex> orgExecutions = new ArrayList<>();
+ orgExecutions.add(ev);
+ pipelinedExecutions.add(ev);
+ getAllPipelinedConnectedVertexes(orgExecutions, pipelinedExecutions);
+
+ FailoverRegion region = new FailoverRegion(executionGraph, executor, pipelinedExecutions);
+ for (ExecutionVertex vertex : pipelinedExecutions) {
+ vertexToRegion.put(vertex, region);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get all connected executions of the original executions
+ *
+ * @param orgExecutions the original execution vertexes
+ * @param connectedExecutions the total connected executions
+ */
+ private static void getAllPipelinedConnectedVertexes(List<ExecutionVertex> orgExecutions, List<ExecutionVertex> connectedExecutions) {
+ List<ExecutionVertex> newAddedExecutions = new ArrayList<>();
+ for (ExecutionVertex ev : orgExecutions) {
+ // Add downstream ExecutionVertex
+ for (IntermediateResultPartition irp : ev.getProducedPartitions().values()) {
+ if (irp.getIntermediateResult().getResultType().isPipelined()) {
+ for (List<ExecutionEdge> consumers : irp.getConsumers()) {
+ for (ExecutionEdge consumer : consumers) {
+ ExecutionVertex cev = consumer.getTarget();
+ if (!connectedExecutions.contains(cev)) {
+ newAddedExecutions.add(cev);
+ }
+ }
+ }
+ }
+ }
+ if (!newAddedExecutions.isEmpty()) {
+ connectedExecutions.addAll(newAddedExecutions);
+ getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions);
+ newAddedExecutions.clear();
+ }
+ // Add upstream ExecutionVertex
+ int inputNum = ev.getNumberOfInputs();
+ for (int i = 0; i < inputNum; i++) {
+ for (ExecutionEdge input : ev.getInputEdges(i)) {
+ if (input.getSource().getIntermediateResult().getResultType().isPipelined()) {
+ ExecutionVertex pev = input.getSource().getProducer();
+ if (!connectedExecutions.contains(pev)) {
+ newAddedExecutions.add(pev);
+ }
+ }
+ }
+ }
+ if (!newAddedExecutions.isEmpty()) {
+ connectedExecutions.addAll(0, newAddedExecutions);
+ getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions);
+ newAddedExecutions.clear();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // testing
+ // ------------------------------------------------------------------------
+
+ /**
+ * Finds the failover region that contains the given execution vertex.
+ */
+ @VisibleForTesting
+ public FailoverRegion getFailoverRegion(ExecutionVertex ev) {
+ return vertexToRegion.get(ev);
+ }
+
+ // ------------------------------------------------------------------------
+ // factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Factory that instantiates the RestartPipelinedRegionStrategy.
+ */
+ public static class Factory implements FailoverStrategy.Factory {
+
+ @Override
+ public FailoverStrategy create(ExecutionGraph executionGraph) {
+ return new RestartPipelinedRegionStrategy(executionGraph);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 0d7e389..140e984 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
@@ -137,6 +138,27 @@ public class ExecutionGraphTestUtils {
}
}
+ public static void waitUntilFailoverRegionState(FailoverRegion region, JobStatus status, long maxWaitMillis)
+ throws TimeoutException {
+
+ checkNotNull(region);
+ checkNotNull(status);
+ checkArgument(maxWaitMillis >= 0);
+
+ // this is a poor implementation - we may want to improve it eventually
+ final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+ while (region.getState() != status && System.nanoTime() < deadline) {
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException ignored) {}
+ }
+
+ if (System.nanoTime() >= deadline) {
+ throw new TimeoutException();
+ }
+ }
+
/**
* Takes all vertices in the given ExecutionGraph and switches their current
* execution to RUNNING.
@@ -320,21 +342,17 @@ public class ExecutionGraphTestUtils {
@Override
public Object handleMessage(Object message) {
- Object result = null;
- if(message instanceof SubmitTask) {
+ if (message instanceof SubmitTask) {
SubmitTask submitTask = (SubmitTask) message;
lastTDD = submitTask.tasks();
-
- result = Acknowledge.get();
+ return Acknowledge.get();
} else if(message instanceof CancelTask) {
- CancelTask cancelTask = (CancelTask) message;
-
- result = Acknowledge.get();
+ return Acknowledge.get();
} else if(message instanceof FailIntermediateResultPartitions) {
- result = new Object();
+ return new Object();
+ } else {
+ return null;
}
-
- return result;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
new file mode 100644
index 0000000..beeefb1b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilFailoverRegionState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FailoverRegionTest extends TestLogger {
+
+ /**
+ * Tests that a job only has one failover region and can recover from task failure successfully
+ * @throws Exception
+ */
+ @Test
+ public void testSingleRegionFailover() throws Exception {
+ RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(10);
+ ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy);
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+ ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next();
+
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev).getState());
+
+ ev.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev).getState());
+
+ for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+ evs.getCurrentExecutionAttempt().cancelingComplete();
+ }
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev).getState());
+ }
+
+ /**
+ * Tests that a job has server failover regions and one region failover does not influence others
+ *
+ * <pre>
+ * (a1) ---> (b1) -+-> (c1) ---+-> (d1)
+ * X /
+ * (a2) ---> (b2) -+-> (c2) -+
+ *
+ * ^ ^ ^
+ * | | |
+ * (pipelined) (blocking) (pipelined)
+ *
+ * </pre>
+ */
+ @Test
+ public void testMultiRegionsFailover() throws Exception {
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+
+ final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20);
+
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+ JobVertex v3 = new JobVertex("vertex3");
+ JobVertex v4 = new JobVertex("vertex4");
+
+ v1.setParallelism(2);
+ v2.setParallelism(2);
+ v3.setParallelism(2);
+ v4.setParallelism(1);
+
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+ v3.setInvokableClass(AbstractInvokable.class);
+ v4.setInvokableClass(AbstractInvokable.class);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
+
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jobId,
+ jobName,
+ new Configuration(),
+ new SerializedValue<>(new ExecutionConfig()),
+ AkkaUtils.getDefaultTimeout(),
+ new InfiniteDelayRestartStrategy(10),
+ new FailoverPipelinedRegionWithDirectExecutor(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ slotProvider,
+ ExecutionGraph.class.getClassLoader());
+
+ eg.attachJobGraph(ordered);
+
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+ // the following two vertices are in the same failover region
+ ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+ ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+
+ // the following two vertices are in the same failover region
+ ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+ ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+
+ // the following vertices are in one failover region
+ ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+ ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+ ExecutionVertex ev4 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+
+ eg.scheduleForExecution();
+
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+
+ ev21.scheduleForExecution(slotProvider, true);
+ ev21.getCurrentExecutionAttempt().fail(new Exception("New fail"));
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState());
+
+ ev11.getCurrentExecutionAttempt().cancelingComplete();
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState());
+
+ ev11.getCurrentExecutionAttempt().markFinished();
+ ev21.getCurrentExecutionAttempt().markFinished();
+ ev22.scheduleForExecution(slotProvider, true);
+ ev22.getCurrentExecutionAttempt().markFinished();
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState());
+
+ waitUntilExecutionState(ev31.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 2000);
+ waitUntilExecutionState(ev32.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 2000);
+
+ ev31.getCurrentExecutionAttempt().fail(new Exception("New fail"));
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev31).getState());
+
+ ev32.getCurrentExecutionAttempt().cancelingComplete();
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState());
+ }
+
+ /**
+ * Tests that when a task fail, and restart strategy doesn't support restarting, the job will go to failed
+ * @throws Exception
+ */
+ @Test
+ public void testNoManualRestart() throws Exception {
+ NoRestartStrategy restartStrategy = new NoRestartStrategy();
+ ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy);
+
+ ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next();
+
+ ev.fail(new Exception("Test Exception"));
+
+ for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+ evs.getCurrentExecutionAttempt().cancelingComplete();
+ }
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+
+ /**
+ * Tests that two failover regions failover at the same time, they will not influence each orther
+ * @throws Exception
+ */
+ @Test
+ public void testMutilRegionFailoverAtSameTime() throws Exception {
+ Instance instance = ExecutionGraphTestUtils.getInstance(
+ new ActorTaskManagerGateway(
+ new SimpleActorGateway(TestingUtils.directExecutionContext())),
+ 16);
+
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+ scheduler.newInstanceAvailable(instance);
+
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+ JobVertex v3 = new JobVertex("vertex3");
+ JobVertex v4 = new JobVertex("vertex4");
+
+ v1.setParallelism(2);
+ v2.setParallelism(2);
+ v3.setParallelism(2);
+ v4.setParallelism(2);
+
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+ v3.setInvokableClass(AbstractInvokable.class);
+ v4.setInvokableClass(AbstractInvokable.class);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
+
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jobId,
+ jobName,
+ cfg,
+ new SerializedValue<>(new ExecutionConfig()),
+ AkkaUtils.getDefaultTimeout(),
+ new InfiniteDelayRestartStrategy(10),
+ new RestartPipelinedRegionStrategy.Factory(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ scheduler,
+ ExecutionGraph.class.getClassLoader());
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+ eg.scheduleForExecution();
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+ ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+ ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+ ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+ ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState());
+
+ ev11.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+ ev31.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState());
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev31).getState());
+
+ ev32.getCurrentExecutionAttempt().cancelingComplete();
+ waitUntilFailoverRegionState(strategy.getFailoverRegion(ev31), JobStatus.RUNNING, 1000);
+
+ ev12.getCurrentExecutionAttempt().cancelingComplete();
+ waitUntilFailoverRegionState(strategy.getFailoverRegion(ev11), JobStatus.RUNNING, 1000);
+ }
+
+ /**
+ * Tests that if a task reports the result of its preceding task is failed,
+ * its preceding task will be considered as failed, and start to failover
+ * TODO: as the report part is not finished yet, this case is ignored temporarily
+ * @throws Exception
+ */
+ @Ignore
+ @Test
+ public void testSucceedingNoticePreceding() throws Exception {
+ Instance instance = ExecutionGraphTestUtils.getInstance(
+ new ActorTaskManagerGateway(
+ new SimpleActorGateway(TestingUtils.directExecutionContext())),
+ 14);
+
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+ scheduler.newInstanceAvailable(instance);
+
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+
+ v1.setParallelism(1);
+ v2.setParallelism(1);
+
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+ List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jobId,
+ jobName,
+ cfg,
+ new SerializedValue<>(new ExecutionConfig()),
+ AkkaUtils.getDefaultTimeout(),
+ new InfiniteDelayRestartStrategy(10),
+ new FailoverPipelinedRegionWithDirectExecutor(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ scheduler,
+ ExecutionGraph.class.getClassLoader());
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.scheduleForExecution();
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+ ExecutionVertex ev11 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+ ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+ ev21.getCurrentExecutionAttempt().fail(new Exception("Fail with v1"));
+
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev21).getState());
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState());
+ }
+
+ /**
+ * Tests that a new failure comes while the failover region is in CANCELLING
+ * @throws Exception
+ */
+ @Test
+ public void testFailWhileCancelling() throws Exception {
+ RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
+ ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy);
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+ Iterator<ExecutionVertex> iter = eg.getAllExecutionVertices().iterator();
+ ExecutionVertex ev1 = iter.next();
+ ev1.getCurrentExecutionAttempt().switchToRunning();
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev1).getState());
+
+ ev1.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState());
+
+ ExecutionVertex ev2 = iter.next();
+ ev2.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState());
+ }
+
+ /**
+ * Tests that a new failure comes while the failover region is restarting
+ * @throws Exception
+ */
+ @Test
+ public void testFailWhileRestarting() throws Exception {
+ RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
+ ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy);
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+ Iterator<ExecutionVertex> iter = eg.getAllExecutionVertices().iterator();
+ ExecutionVertex ev1 = iter.next();
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev1).getState());
+
+ ev1.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState());
+
+ for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+ evs.getCurrentExecutionAttempt().cancelingComplete();
+ }
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev1).getState());
+
+ ev1.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState());
+ }
+
+ private static ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws Exception {
+ Instance instance = ExecutionGraphTestUtils.getInstance(
+ new ActorTaskManagerGateway(
+ new SimpleActorGateway(TestingUtils.directExecutionContext())),
+ 14);
+
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+ scheduler.newInstanceAvailable(instance);
+
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+ JobVertex v3 = new JobVertex("vertex3");
+
+ v1.setParallelism(3);
+ v2.setParallelism(2);
+ v3.setParallelism(2);
+
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+ v3.setInvokableClass(AbstractInvokable.class);
+
+ v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3));
+
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jobId,
+ jobName,
+ cfg,
+ new SerializedValue<>(new ExecutionConfig()),
+ AkkaUtils.getDefaultTimeout(),
+ restartStrategy,
+ new FailoverPipelinedRegionWithDirectExecutor(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ scheduler,
+ ExecutionGraph.class.getClassLoader());
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ eg.scheduleForExecution();
+ return eg;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A factory to create a RestartPipelinedRegionStrategy that uses a
+ * direct (synchronous) executor for easier testing.
+ */
+ private static class FailoverPipelinedRegionWithDirectExecutor implements Factory {
+
+ @Override
+ public FailoverStrategy create(ExecutionGraph executionGraph) {
+ return new RestartPipelinedRegionStrategy(executionGraph, Executors.directExecutor());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
new file mode 100644
index 0000000..635ec75
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * These tests make sure that global failover (restart all) always takes precedence over
+ * local recovery strategies for the {@link RestartPipelinedRegionStrategy}
+ *
+ * <p>This test must be in the package it resides in, because it uses package-private methods
+ * from the ExecutionGraph classes.
+ */
+public class PipelinedRegionFailoverConcurrencyTest {
+
+ /**
+ * Tests that a cancellation concurrent to a local failover leads to a properly
+ * cancelled state.
+ */
+ @Test
+ public void testCancelWhileInLocalFailover() throws Exception {
+
+ // the logic in this test is as follows:
+ // - start a job
+ // - cause a task failure and delay the local recovery action via the manual executor
+ // - cancel the job to go into cancelling
+ // - resume in local recovery action
+ // - validate that this does in fact not start a new task, because the graph as a
+ // whole should now be cancelled already
+
+ final JobID jid = new JobID();
+ final int parallelism = 2;
+
+ final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+ final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+ final ExecutionGraph graph = createSampleGraph(
+ jid,
+ new FailoverPipelinedRegionWithCustomExecutor(executor),
+ new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0),
+ slotProvider,
+ 2);
+
+ final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+ final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+ final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+ graph.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, graph.getState());
+
+ // let one of the vertices fail - that triggers a local recovery action
+ vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+ assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+
+ // graph should still be running and the failover recovery action should be queued
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(1, executor.numQueuedRunnables());
+
+ // now cancel the job
+ graph.cancel();
+
+ assertEquals(JobStatus.CANCELLING, graph.getState());
+ assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.CANCELING, vertex2.getCurrentExecutionAttempt().getState());
+
+ // let the recovery action continue
+ executor.trigger();
+
+ // now report that cancelling is complete for the other vertex
+ vertex2.getCurrentExecutionAttempt().cancelingComplete();
+
+ assertEquals(JobStatus.CANCELED, graph.getState());
+ assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
+ assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
+
+ // make sure all slots are recycled
+ assertEquals(parallelism, slotProvider.getNumberOfAvailableSlots());
+ }
+
+ /**
+ * Tests that a terminal global failure concurrent to a local failover
+ * leads to a properly failed state.
+ */
+ @Test
+ public void testGlobalFailureConcurrentToLocalFailover() throws Exception {
+
+ // the logic in this test is as follows:
+ // - start a job
+ // - cause a task failure and delay the local recovery action via the manual executor
+ // - cause a global failure
+ // - resume in local recovery action
+ // - validate that this does in fact not start a new task, because the graph as a
+ // whole should now be terminally failed already
+
+ final JobID jid = new JobID();
+ final int parallelism = 2;
+
+ final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+ final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+ final ExecutionGraph graph = createSampleGraph(
+ jid,
+ new FailoverPipelinedRegionWithCustomExecutor(executor),
+ new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0),
+ slotProvider,
+ 2);
+
+ final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+ final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+ final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+ graph.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, graph.getState());
+
+ // let one of the vertices fail - that triggers a local recovery action
+ vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+ assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+
+ // graph should still be running and the failover recovery action should be queued
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(1, executor.numQueuedRunnables());
+
+ // now cancel the job
+ graph.failGlobal(new SuppressRestartsException(new Exception("test exception")));
+
+ assertEquals(JobStatus.FAILING, graph.getState());
+ assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.CANCELING, vertex2.getCurrentExecutionAttempt().getState());
+
+ // let the recovery action continue
+ executor.trigger();
+
+ // now report that cancelling is complete for the other vertex
+ vertex2.getCurrentExecutionAttempt().cancelingComplete();
+
+ assertEquals(JobStatus.FAILED, graph.getState());
+ assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
+ assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
+
+ // make sure all slots are recycled
+ assertEquals(parallelism, slotProvider.getNumberOfAvailableSlots());
+ }
+
+ /**
+ * Tests that a local failover does not try to trump a global failover.
+ */
+ @Test
+ public void testGlobalRecoveryConcurrentToLocalRecovery() throws Exception {
+
+ // the logic in this test is as follows:
+ // - start a job
+ // - cause a task failure and delay the local recovery action via the manual executor
+ // - cause a global failure that is recovering immediately
+ // - resume in local recovery action
+ // - validate that this does in fact not cause another task restart, because the global
+ // recovery should already have restarted the task graph
+
+ final JobID jid = new JobID();
+ final int parallelism = 2;
+
+ final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+ final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+ final ExecutionGraph graph = createSampleGraph(
+ jid,
+ new FailoverPipelinedRegionWithCustomExecutor(executor),
+ new FixedDelayRestartStrategy(2, 0), // twice restart, no delay
+ slotProvider,
+ 2);
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)graph.getFailoverStrategy();
+
+ final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+ final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+ final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+ graph.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex1).getState());
+
+ // let one of the vertices fail - that triggers a local recovery action
+ vertex2.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+ assertEquals(ExecutionState.FAILED, vertex2.getCurrentExecutionAttempt().getState());
+ assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(vertex2).getState());
+
+ // graph should still be running and the failover recovery action should be queued
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(1, executor.numQueuedRunnables());
+
+ // now cancel the job
+ graph.failGlobal(new Exception("test exception"));
+
+ assertEquals(JobStatus.FAILING, graph.getState());
+ assertEquals(ExecutionState.FAILED, vertex2.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.CANCELING, vertex1.getCurrentExecutionAttempt().getState());
+
+ // now report that cancelling is complete for the other vertex
+ vertex1.getCurrentExecutionAttempt().cancelingComplete();
+
+ waitUntilJobStatus(graph, JobStatus.RUNNING, 1000);
+ assertEquals(JobStatus.RUNNING, graph.getState());
+
+ waitUntilExecutionState(vertex1.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000);
+ waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000);
+ vertex1.getCurrentExecutionAttempt().switchToRunning();
+ vertex2.getCurrentExecutionAttempt().switchToRunning();
+ assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState());
+
+ // let the recovery action continue - this should do nothing any more
+ executor.trigger();
+
+ // validate that the graph is still peachy
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex1).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex2).getState());
+ assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState());
+ assertEquals(1, vertex1.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, vertex2.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, vertex1.getCopyOfPriorExecutionsList().size());
+ assertEquals(1, vertex2.getCopyOfPriorExecutionsList().size());
+
+ // make sure all slots are in use
+ assertEquals(0, slotProvider.getNumberOfAvailableSlots());
+
+ // validate that a task failure then can be handled by the local recovery
+ vertex2.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+ assertEquals(1, executor.numQueuedRunnables());
+
+ // let the local recovery action continue - this should recover the vertex2
+ executor.trigger();
+
+ waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000);
+ vertex2.getCurrentExecutionAttempt().switchToRunning();
+
+ // validate that the local recovery result
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex1).getState());
+ assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex2).getState());
+ assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState());
+ assertEquals(1, vertex1.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(2, vertex2.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, vertex1.getCopyOfPriorExecutionsList().size());
+ assertEquals(2, vertex2.getCopyOfPriorExecutionsList().size());
+
+ // make sure all slots are in use
+ assertEquals(0, slotProvider.getNumberOfAvailableSlots());
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private ExecutionGraph createSampleGraph(
+ JobID jid,
+ Factory failoverStrategy,
+ RestartStrategy restartStrategy,
+ SlotProvider slotProvider,
+ int parallelism) throws Exception {
+
+ // build a simple execution graph with on job vertex, parallelism 2
+ final ExecutionGraph graph = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jid,
+ "test job",
+ new Configuration(),
+ new SerializedValue<>(new ExecutionConfig()),
+ Time.seconds(10),
+ restartStrategy,
+ failoverStrategy,
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ slotProvider,
+ getClass().getClassLoader());
+
+ JobVertex jv = new JobVertex("test vertex");
+ jv.setInvokableClass(NoOpInvokable.class);
+ jv.setParallelism(parallelism);
+
+ JobGraph jg = new JobGraph(jid, "testjob", jv);
+ graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+
+ return graph;
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class FailoverPipelinedRegionWithCustomExecutor implements Factory {
+
+ private final Executor executor;
+
+ FailoverPipelinedRegionWithCustomExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
+ @Override
+ public FailoverStrategy create(ExecutionGraph executionGraph) {
+ return new RestartPipelinedRegionStrategy(executionGraph, executor);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
new file mode 100644
index 0000000..45aabe6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
+import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class RestartPipelinedRegionStrategyTest {
+
+ /**
+ * Creates a JobGraph of the following form:
+ *
+ * <pre>
+ * v1--->v2-->\
+ * \
+ * v4 --->\
+ * ----->/ \
+ * v3-->/ v5
+ * \ /
+ * ------------->/
+ * </pre>
+ */
+ @Test
+ public void testSimpleFailoverRegion() throws Exception {
+
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+ JobVertex v3 = new JobVertex("vertex3");
+ JobVertex v4 = new JobVertex("vertex4");
+ JobVertex v5 = new JobVertex("vertex5");
+
+ v1.setParallelism(5);
+ v2.setParallelism(7);
+ v3.setParallelism(2);
+ v4.setParallelism(11);
+ v5.setParallelism(4);
+
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+ v3.setInvokableClass(AbstractInvokable.class);
+ v4.setInvokableClass(AbstractInvokable.class);
+ v5.setInvokableClass(AbstractInvokable.class);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jobId,
+ jobName,
+ cfg,
+ new SerializedValue<>(new ExecutionConfig()),
+ AkkaUtils.getDefaultTimeout(),
+ new NoRestartStrategy(),
+ new RestartPipelinedRegionStrategy.Factory(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ scheduler,
+ ExecutionGraph.class.getClassLoader());
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+ ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+ ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+ ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+ ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+ ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
+ FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[2]);
+ FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[3]);
+ FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+ FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[4]);
+ FailoverRegion region5 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
+
+ assertEquals(region1, region2);
+ assertEquals(region3, region2);
+ assertEquals(region4, region2);
+ assertEquals(region5, region2);
+ }
+
+ /**
+ * Creates a JobGraph of the following form:
+ *
+ * <pre>
+ * v2 ------->\
+ * \
+ * v1---------> v4 --->|\
+ * \
+ * v5
+ * /
+ * v3--------------->|/
+ * </pre>
+ */
+ @Test
+ public void testMultipleFailoverRegions() throws Exception {
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+ JobVertex v3 = new JobVertex("vertex3");
+ JobVertex v4 = new JobVertex("vertex4");
+ JobVertex v5 = new JobVertex("vertex5");
+
+ v1.setParallelism(3);
+ v2.setParallelism(2);
+ v3.setParallelism(2);
+ v4.setParallelism(5);
+ v5.setParallelism(2);
+
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+ v3.setInvokableClass(AbstractInvokable.class);
+ v4.setInvokableClass(AbstractInvokable.class);
+ v5.setInvokableClass(AbstractInvokable.class);
+
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+ v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+ List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jobId,
+ jobName,
+ cfg,
+ new SerializedValue<>(new ExecutionConfig()),
+ AkkaUtils.getDefaultTimeout(),
+ new NoRestartStrategy(),
+ new RestartPipelinedRegionStrategy.Factory(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ scheduler,
+ ExecutionGraph.class.getClassLoader());
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ // All in one failover region
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+ ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+ ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+ ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+ ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+ ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
+ FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
+ FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
+ FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
+ FailoverRegion region31 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+ FailoverRegion region32 = strategy.getFailoverRegion(ejv3.getTaskVertices()[1]);
+ FailoverRegion region51 = strategy.getFailoverRegion(ejv5.getTaskVertices()[0]);
+ FailoverRegion region52 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
+
+ //There should be 5 failover regions. v1 v2 v4 in one, v3 has two, v5 has two
+ assertEquals(region1, region2);
+ assertEquals(region2, region4);
+ assertFalse(region31.equals(region32));
+ assertFalse(region51.equals(region52));
+ }
+
+ /**
+ * Creates a JobGraph of the following form:
+ *
+ * <pre>
+ * v1--->v2-->\
+ * \
+ * v4 --->|\
+ * ----->/ \
+ * v3-->/ v5
+ * \ /
+ * ------------->/
+ * </pre>
+ */
+ @Test
+ public void testSingleRegionWithMixedInput() throws Exception {
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+ JobVertex v3 = new JobVertex("vertex3");
+ JobVertex v4 = new JobVertex("vertex4");
+ JobVertex v5 = new JobVertex("vertex5");
+
+ v1.setParallelism(3);
+ v2.setParallelism(2);
+ v3.setParallelism(2);
+ v4.setParallelism(5);
+ v5.setParallelism(2);
+
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+ v3.setInvokableClass(AbstractInvokable.class);
+ v4.setInvokableClass(AbstractInvokable.class);
+ v5.setInvokableClass(AbstractInvokable.class);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+ List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jobId,
+ jobName,
+ cfg,
+ new SerializedValue<>(new ExecutionConfig()),
+ AkkaUtils.getDefaultTimeout(),
+ new NoRestartStrategy(),
+ new RestartPipelinedRegionStrategy.Factory(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ scheduler,
+ ExecutionGraph.class.getClassLoader());
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ // All in one failover region
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+ ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+ ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+ ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+ ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+ ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
+ FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
+ FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
+ FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
+ FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+ FailoverRegion region5 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
+
+ assertEquals(region1, region2);
+ assertEquals(region2, region4);
+ assertEquals(region3, region2);
+ assertEquals(region1, region5);
+ }
+
+ /**
+ * Creates a JobGraph of the following form:
+ *
+ * <pre>
+ * v1-->v2-->|\
+ * \
+ * v4
+ * /
+ * v3------>/
+ * </pre>
+ */
+ @Test
+ public void testMultiRegionNotAllToAll() throws Exception {
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+ JobVertex v3 = new JobVertex("vertex3");
+ JobVertex v4 = new JobVertex("vertex4");
+ JobVertex v5 = new JobVertex("vertex5");
+
+ v1.setParallelism(2);
+ v2.setParallelism(2);
+ v3.setParallelism(5);
+ v4.setParallelism(5);
+
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+ v3.setInvokableClass(AbstractInvokable.class);
+ v4.setInvokableClass(AbstractInvokable.class);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+ List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4));
+
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jobId,
+ jobName,
+ cfg,
+ new SerializedValue<>(new ExecutionConfig()),
+ AkkaUtils.getDefaultTimeout(),
+ new NoRestartStrategy(),
+ new RestartPipelinedRegionStrategy.Factory(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ scheduler,
+ ExecutionGraph.class.getClassLoader());
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ // All in one failover region
+ RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+ ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+ ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+ ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+ ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+ FailoverRegion region11 = strategy.getFailoverRegion(ejv1.getTaskVertices()[0]);
+ FailoverRegion region12 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
+ FailoverRegion region21 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
+ FailoverRegion region22 = strategy.getFailoverRegion(ejv2.getTaskVertices()[1]);
+ FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+ FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
+
+ //There should be 3 failover regions. v1 v2 in two, v3 and v4 in one
+ assertEquals(region11, region21);
+ assertEquals(region12, region22);
+ assertFalse(region11.equals(region12));
+ assertFalse(region3.equals(region4));
+ }
+
+}
[4/4] flink git commit: [hotfix] [runtime] Correct some JavaDocs for
RestartIndividualStrategy
Posted by se...@apache.org.
[hotfix] [runtime] Correct some JavaDocs for RestartIndividualStrategy
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e515b9bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e515b9bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e515b9bb
Branch: refs/heads/master
Commit: e515b9bb2e2c0b5aa9911dd0c91fb2016da3432f
Parents: dcfd37c
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Tue Apr 25 15:42:48 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 21:40:05 2017 +0200
----------------------------------------------------------------------
.../executiongraph/failover/RestartIndividualStrategy.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e515b9bb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
index 0a449b8..0e7bca5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
@@ -60,7 +60,7 @@ public class RestartIndividualStrategy extends FailoverStrategy {
private final SimpleCounter numTaskFailures;
/**
- * Creates a new failover strategy that recovers from failures by restarting all tasks
+ * Creates a new failover strategy that recovers from failures by restarting only the failed task
* of the execution graph.
*
* <p>The strategy will use the ExecutionGraph's future executor for callbacks.
@@ -72,7 +72,7 @@ public class RestartIndividualStrategy extends FailoverStrategy {
}
/**
- * Creates a new failover strategy that recovers from failures by restarting all tasks
+ * Creates a new failover strategy that recovers from failures by restarting only the failed task
* of the execution graph.
*
* @param executionGraph The execution graph to handle.