You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/03 18:07:51 UTC
[1/5] flink git commit: [FLINK-5869] [flip-1] Add basic abstraction
for Failover Strategies to ExecutionGraph
Repository: flink
Updated Branches:
refs/heads/master 821ec80d7 -> 8ed85fe49
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
new file mode 100644
index 0000000..4dd55ae
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.Random;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Mockito.*;
+
+public class GlobalModVersionTest {
+
+ /**
+ * Tests that failures during a global cancellation are not handed to the local
+ * failover strategy.
+ */
+ @Test
+ public void testNoLocalFailoverWhileCancelling() throws Exception {
+ final FailoverStrategy mockStrategy = mock(FailoverStrategy.class);
+
+ final ExecutionGraph graph = createSampleGraph(mockStrategy);
+
+ final ExecutionVertex testVertex = getRandomVertex(graph);
+
+ graph.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(1L, graph.getGlobalModVersion());
+
+ // wait until everything is running
+ for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ final Execution exec = v.getCurrentExecutionAttempt();
+ waitUntilExecutionState(exec, ExecutionState.DEPLOYING, 1000);
+ exec.switchToRunning();
+ assertEquals(ExecutionState.RUNNING, exec.getState());
+ }
+
+ // now cancel the job
+ graph.cancel();
+ assertEquals(2L, graph.getGlobalModVersion());
+
+ // everything should be cancelling
+ for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ final Execution exec = v.getCurrentExecutionAttempt();
+ assertEquals(ExecutionState.CANCELING, exec.getState());
+ }
+
+ // let a vertex fail
+ testVertex.getCurrentExecutionAttempt().fail(new Exception("test exception"));
+
+ // all cancellations are done now
+ for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ final Execution exec = v.getCurrentExecutionAttempt();
+ exec.cancelingComplete();
+ }
+
+ assertEquals(JobStatus.CANCELED, graph.getState());
+
+ // no failure notification at all
+ verify(mockStrategy, times(0)).onTaskFailure(any(Execution.class), any(Throwable.class));
+ }
+
+ /**
+ * Tests that failures during a global faiover are not handed to the local
+ * failover strategy.
+ */
+ @Test
+ public void testNoLocalFailoverWhileFailing() throws Exception {
+ final FailoverStrategy mockStrategy = mock(FailoverStrategy.class);
+
+ final ExecutionGraph graph = createSampleGraph(mockStrategy);
+
+ final ExecutionVertex testVertex = getRandomVertex(graph);
+
+ graph.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, graph.getState());
+
+ // wait until everything is running
+ for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ final Execution exec = v.getCurrentExecutionAttempt();
+ waitUntilExecutionState(exec, ExecutionState.DEPLOYING, 1000);
+ exec.switchToRunning();
+ assertEquals(ExecutionState.RUNNING, exec.getState());
+ }
+
+ // now send the job into global failover
+ graph.failGlobal(new Exception("global failover"));
+ assertEquals(JobStatus.FAILING, graph.getState());
+ assertEquals(2L, graph.getGlobalModVersion());
+
+ // another attempt to fail global should not do anything
+ graph.failGlobal(new Exception("should be ignored"));
+ assertEquals(JobStatus.FAILING, graph.getState());
+ assertEquals(2L, graph.getGlobalModVersion());
+
+ // everything should be cancelling
+ for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ final Execution exec = v.getCurrentExecutionAttempt();
+ assertEquals(ExecutionState.CANCELING, exec.getState());
+ }
+
+ // let a vertex fail
+ testVertex.getCurrentExecutionAttempt().fail(new Exception("test exception"));
+
+ // all cancellations are done now
+ for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ final Execution exec = v.getCurrentExecutionAttempt();
+ exec.cancelingComplete();
+ }
+
+ assertEquals(JobStatus.RESTARTING, graph.getState());
+
+ // no failure notification at all
+ verify(mockStrategy, times(0)).onTaskFailure(any(Execution.class), any(Throwable.class));
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private ExecutionGraph createSampleGraph(FailoverStrategy failoverStrategy) throws Exception {
+
+ final JobID jid = new JobID();
+ final int parallelism = new Random().nextInt(10) + 1;
+
+ final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+ // build a simple execution graph with on job vertex, parallelism 2
+ final ExecutionGraph graph = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jid,
+ "test job",
+ new Configuration(),
+ new SerializedValue<>(new ExecutionConfig()),
+ Time.seconds(10),
+ new InfiniteDelayRestartStrategy(),
+ new CustomStrategy(failoverStrategy),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ slotProvider,
+ getClass().getClassLoader());
+
+ JobVertex jv = new JobVertex("test vertex");
+ jv.setInvokableClass(NoOpInvokable.class);
+ jv.setParallelism(parallelism);
+
+ JobGraph jg = new JobGraph(jid, "testjob", jv);
+ graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+
+ return graph;
+ }
+
+ private static ExecutionVertex getRandomVertex(ExecutionGraph eg) {
+ final ExecutionVertex[] vertices = eg.getVerticesTopologically().iterator().next().getTaskVertices();
+ return vertices[new Random().nextInt(vertices.length)];
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class CustomStrategy implements Factory {
+
+ private final FailoverStrategy failoverStrategy;
+
+ CustomStrategy(FailoverStrategy failoverStrategy) {
+ this.failoverStrategy = failoverStrategy;
+ }
+
+ @Override
+ public FailoverStrategy create(ExecutionGraph executionGraph) {
+ return failoverStrategy;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
new file mode 100644
index 0000000..713aece
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+
+import static org.junit.Assert.*;
+
+/**
+ * These tests make sure that global failover (restart all) always takes precedence over
+ * local recovery strategies.
+ *
+ * <p>This test must be in the package it resides in, because it uses package-private methods
+ * from the ExecutionGraph classes.
+ */
+public class IndividualRestartsConcurrencyTest {
+
+ /**
+ * Tests that a cancellation concurrent to a local failover leads to a properly
+ * cancelled state.
+ */
+ @Test
+ public void testCancelWhileInLocalFailover() throws Exception {
+
+ // the logic in this test is as follows:
+ // - start a job
+ // - cause a task failure and delay the local recovery action via the manual executor
+ // - cancel the job to go into cancelling
+ // - resume in local recovery action
+ // - validate that this does in fact not start a new task, because the graph as a
+ // whole should now be cancelled already
+
+ final JobID jid = new JobID();
+ final int parallelism = 2;
+
+ final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+ final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+ final ExecutionGraph graph = createSampleGraph(
+ jid,
+ new IndividualFailoverWithCustomExecutor(executor),
+ slotProvider,
+ 2);
+
+ final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+ final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+ final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+ graph.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, graph.getState());
+
+ // let one of the vertices fail - that triggers a local recovery action
+ vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+ assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+
+ // graph should still be running and the failover recovery action should be queued
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(1, executor.numQueuedRunnables());
+
+ // now cancel the job
+ graph.cancel();
+
+ assertEquals(JobStatus.CANCELLING, graph.getState());
+ assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.CANCELING, vertex2.getCurrentExecutionAttempt().getState());
+
+ // let the recovery action continue
+ executor.trigger();
+
+ // now report that cancelling is complete for the other vertex
+ vertex2.getCurrentExecutionAttempt().cancelingComplete();
+
+ assertEquals(JobStatus.CANCELED, graph.getState());
+ assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
+ assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
+
+ // make sure all slots are recycled
+ assertEquals(parallelism, slotProvider.getNumberOfAvailableSlots());
+ }
+
+ /**
+ * Tests that a terminal global failure concurrent to a local failover
+ * leads to a properly failed state.
+ */
+ @Test
+ public void testGlobalFailureConcurrentToLocalFailover() throws Exception {
+
+ // the logic in this test is as follows:
+ // - start a job
+ // - cause a task failure and delay the local recovery action via the manual executor
+ // - cause a global failure
+ // - resume in local recovery action
+ // - validate that this does in fact not start a new task, because the graph as a
+ // whole should now be terminally failed already
+
+ final JobID jid = new JobID();
+ final int parallelism = 2;
+
+ final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+ final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+ final ExecutionGraph graph = createSampleGraph(
+ jid,
+ new IndividualFailoverWithCustomExecutor(executor),
+ slotProvider,
+ 2);
+
+ final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+ final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+ final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+ graph.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, graph.getState());
+
+ // let one of the vertices fail - that triggers a local recovery action
+ vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+ assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+
+ // graph should still be running and the failover recovery action should be queued
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(1, executor.numQueuedRunnables());
+
+ // now cancel the job
+ graph.failGlobal(new Exception("test exception"));
+
+ assertEquals(JobStatus.FAILING, graph.getState());
+ assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.CANCELING, vertex2.getCurrentExecutionAttempt().getState());
+
+ // let the recovery action continue
+ executor.trigger();
+
+ // now report that cancelling is complete for the other vertex
+ vertex2.getCurrentExecutionAttempt().cancelingComplete();
+
+ assertEquals(JobStatus.FAILED, graph.getState());
+ assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
+ assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
+
+ // make sure all slots are recycled
+ assertEquals(parallelism, slotProvider.getNumberOfAvailableSlots());
+ }
+
+ /**
+ * Tests that a local failover does not try to trump a global failover.
+ */
+ @Test
+ public void testGlobalRecoveryConcurrentToLocalRecovery() throws Exception {
+
+ // the logic in this test is as follows:
+ // - start a job
+ // - cause a task failure and delay the local recovery action via the manual executor
+ // - cause a global failure that is recovering immediately
+ // - resume in local recovery action
+ // - validate that this does in fact not cause another task restart, because the global
+ // recovery should already have restarted the task graph
+
+ final JobID jid = new JobID();
+ final int parallelism = 2;
+
+ final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+ final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+ final ExecutionGraph graph = createSampleGraph(
+ jid,
+ new IndividualFailoverWithCustomExecutor(executor),
+ new FixedDelayRestartStrategy(1, 0), // one restart, no delay
+ slotProvider,
+ 2);
+
+ final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+ final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+ final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+ graph.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, graph.getState());
+
+ // let one of the vertices fail - that triggers a local recovery action
+ vertex2.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+ assertEquals(ExecutionState.FAILED, vertex2.getCurrentExecutionAttempt().getState());
+
+ // graph should still be running and the failover recovery action should be queued
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(1, executor.numQueuedRunnables());
+
+ // now cancel the job
+ graph.failGlobal(new Exception("test exception"));
+
+ assertEquals(JobStatus.FAILING, graph.getState());
+ assertEquals(ExecutionState.FAILED, vertex2.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.CANCELING, vertex1.getCurrentExecutionAttempt().getState());
+
+ // now report that cancelling is complete for the other vertex
+ vertex1.getCurrentExecutionAttempt().cancelingComplete();
+
+ waitUntilJobStatus(graph, JobStatus.RUNNING, 1000);
+ assertEquals(JobStatus.RUNNING, graph.getState());
+
+ waitUntilExecutionState(vertex1.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000);
+ waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000);
+ vertex1.getCurrentExecutionAttempt().switchToRunning();
+ vertex2.getCurrentExecutionAttempt().switchToRunning();
+ assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState());
+
+ // let the recovery action continue - this should do nothing any more
+ executor.trigger();
+
+ // validate that the graph is still peachy
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState());
+ assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState());
+ assertEquals(1, vertex1.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, vertex2.getCurrentExecutionAttempt().getAttemptNumber());
+ assertEquals(1, vertex1.getCopyOfPriorExecutionsList().size());
+ assertEquals(1, vertex2.getCopyOfPriorExecutionsList().size());
+
+ // make sure all slots are in use
+ assertEquals(0, slotProvider.getNumberOfAvailableSlots());
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private ExecutionGraph createSampleGraph(
+ JobID jid,
+ Factory failoverStrategy,
+ SlotProvider slotProvider,
+ int parallelism) throws Exception {
+
+ return createSampleGraph(jid, failoverStrategy, new NoRestartStrategy(), slotProvider, parallelism);
+ }
+
+ private ExecutionGraph createSampleGraph(
+ JobID jid,
+ Factory failoverStrategy,
+ RestartStrategy restartStrategy,
+ SlotProvider slotProvider,
+ int parallelism) throws Exception {
+
+ // build a simple execution graph with on job vertex, parallelism 2
+ final ExecutionGraph graph = new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ jid,
+ "test job",
+ new Configuration(),
+ new SerializedValue<>(new ExecutionConfig()),
+ Time.seconds(10),
+ restartStrategy,
+ failoverStrategy,
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ slotProvider,
+ getClass().getClassLoader());
+
+ JobVertex jv = new JobVertex("test vertex");
+ jv.setInvokableClass(NoOpInvokable.class);
+ jv.setParallelism(parallelism);
+
+ JobGraph jg = new JobGraph(jid, "testjob", jv);
+ graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+
+ return graph;
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class IndividualFailoverWithCustomExecutor implements Factory {
+
+ private final Executor executor;
+
+ IndividualFailoverWithCustomExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
+ @Override
+ public FailoverStrategy create(ExecutionGraph executionGraph) {
+ return new RestartIndividualStrategy(executionGraph, executor);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
deleted file mode 100644
index c107d54..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * A job status listener that waits lets one block until the job is in a terminal state.
- */
-public class TerminalJobStatusListener implements JobStatusListener {
-
- private final OneShotLatch terminalStateLatch = new OneShotLatch();
-
- public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException {
- terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
- if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) {
- terminalStateLatch.trigger();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
new file mode 100644
index 0000000..6e67c1a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.StackTrace;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+
+import java.util.UUID;
+
+/**
+ * A TaskManagerGateway that simply acks the basic operations (deploy, cancel, update) and does not
+ * support any more advanced operations.
+ */
+public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
+
+ private final String address = UUID.randomUUID().toString();
+
+ @Override
+ public String getAddress() {
+ return address;
+ }
+
+ @Override
+ public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {}
+
+ @Override
+ public void stopCluster(ApplicationStatus applicationStatus, String message) {}
+
+ @Override
+ public Future<StackTrace> requestStackTrace(Time timeout) {
+ return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ }
+
+ @Override
+ public Future<StackTraceSampleResponse> requestStackTraceSample(
+ ExecutionAttemptID executionAttemptID,
+ int sampleId,
+ int numSamples,
+ Time delayBetweenSamples,
+ int maxStackTraceDepth,
+ Time timeout) {
+ return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ }
+
+ @Override
+ public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+ return FlinkCompletableFuture.completed(Acknowledge.get());
+ }
+
+ @Override
+ public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return FlinkCompletableFuture.completed(Acknowledge.get());
+ }
+
+ @Override
+ public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return FlinkCompletableFuture.completed(Acknowledge.get());
+ }
+
+ @Override
+ public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+ return FlinkCompletableFuture.completed(Acknowledge.get());
+ }
+
+ @Override
+ public void failPartition(ExecutionAttemptID executionAttemptID) {}
+
+ @Override
+ public void notifyCheckpointComplete(
+ ExecutionAttemptID executionAttemptID,
+ JobID jobId,
+ long checkpointId,
+ long timestamp) {}
+
+ @Override
+ public void triggerCheckpoint(
+ ExecutionAttemptID executionAttemptID,
+ JobID jobId,
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions) {}
+
+ @Override
+ public Future<BlobKey> requestTaskManagerLog(Time timeout) {
+ return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ }
+
+ @Override
+ public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
+ return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
new file mode 100644
index 0000000..2cf1eec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.net.InetAddress;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A testing utility slot provider that simply has a predefined pool of slots.
+ */
+public class SimpleSlotProvider implements SlotProvider, SlotOwner {
+
+ private final ArrayDeque<AllocatedSlot> slots;
+
+ public SimpleSlotProvider(JobID jobId, int numSlots) {
+ this(jobId, numSlots, new SimpleAckingTaskManagerGateway());
+ }
+
+ public SimpleSlotProvider(JobID jobId, int numSlots, TaskManagerGateway taskManagerGateway) {
+ checkNotNull(jobId, "jobId");
+ checkArgument(numSlots >= 0, "numSlots must be >= 0");
+
+ this.slots = new ArrayDeque<>(numSlots);
+
+ for (int i = 0; i < numSlots; i++) {
+ AllocatedSlot as = new AllocatedSlot(
+ new AllocationID(),
+ jobId,
+ new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
+ 0,
+ ResourceProfile.UNKNOWN,
+ taskManagerGateway);
+ slots.add(as);
+ }
+ }
+
+ @Override
+ public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+ final AllocatedSlot slot;
+
+ synchronized (slots) {
+ if (slots.isEmpty()) {
+ slot = null;
+ } else {
+ slot = slots.removeFirst();
+ }
+ }
+
+ if (slot != null) {
+ SimpleSlot result = new SimpleSlot(slot, this, 0);
+ return FlinkCompletableFuture.completed(result);
+ }
+ else {
+ return FlinkCompletableFuture.completedExceptionally(new NoResourceAvailableException());
+ }
+ }
+
+ @Override
+ public boolean returnAllocatedSlot(Slot slot) {
+ synchronized (slots) {
+ slots.add(slot.getAllocatedSlot());
+ }
+ return true;
+ }
+
+ public int getNumberOfAvailableSlots() {
+ synchronized (slots) {
+ return slots.size();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 6316bfd..195baa1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -447,7 +447,7 @@ public class JobManagerTest extends TestLogger {
vertex.getCurrentExecutionAttempt().getAttemptId());
// Reset execution => new execution attempt
- vertex.resetForNewExecution();
+ vertex.resetForNewExecution(System.currentTimeMillis(), 1L);
// Producer finished, request state
Object request = new JobManagerMessages.RequestPartitionProducerState(jid, rid, partitionId);
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 49d0239..b1bb548 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.TerminalJobStatusListener;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -53,7 +52,6 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
private int numSlotsPerTM = 1;
private int parallelism = numTMs * numSlotsPerTM;
- private Configuration configuration;
private LeaderElectionRetrievalTestingCluster cluster = null;
private JobGraph job = createBlockingJob(parallelism);
@@ -61,7 +59,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
public void before() throws TimeoutException, InterruptedException {
Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
- configuration = new Configuration();
+ Configuration configuration = new Configuration();
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
@@ -110,12 +108,9 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
ExecutionGraph executionGraph = (ExecutionGraph) ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
- TerminalJobStatusListener testListener = new TerminalJobStatusListener();
- executionGraph.registerJobStatusListener(testListener);
-
cluster.revokeLeadership();
- testListener.waitForTerminalState(30000);
+ executionGraph.getTerminationFuture().get(30, TimeUnit.SECONDS);
}
public JobGraph createBlockingJob(int parallelism) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 7102f27..4019d63 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -147,6 +148,7 @@ public class RescalePartitionerTest extends TestLogger {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
+ new RestartAllStrategy.Factory(),
new ArrayList<BlobKey>(),
new ArrayList<URL>(),
new Scheduler(TestingUtils.defaultExecutionContext()),
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredDirectExecutor.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredDirectExecutor.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredDirectExecutor.java
new file mode 100644
index 0000000..07c7bd3
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredDirectExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayDeque;
+import java.util.concurrent.Executor;
+
+/**
+ * An executor that does not immediately execute a Runnable, but only executes it
+ * upon an explicit trigger.
+ *
+ * <p>This executor can be used in concurrent tests to control when certain asynchronous
+ * actions should happen.
+ */
+public class ManuallyTriggeredDirectExecutor implements Executor {
+
+ private final ArrayDeque<Runnable> queuedRunnables = new ArrayDeque<>();
+
+ @Override
+ public void execute(@Nonnull Runnable command) {
+ synchronized (queuedRunnables) {
+ queuedRunnables.addLast(command);
+ }
+ }
+
+ /**
+ * Triggers the next queued runnable and executes it synchronously.
+ * This method throws an exception if no Runnable is currently queued.
+ */
+ public void trigger() {
+ final Runnable next;
+
+ synchronized (queuedRunnables) {
+ next = queuedRunnables.removeFirst();
+ }
+
+ if (next != null) {
+ next.run();
+ }
+ else {
+ throw new IllegalStateException("No runnable available");
+ }
+ }
+
+ /**
+ * Gets the number of Runnables currently queued.
+ */
+ public int numQueuedRunnables() {
+ synchronized (queuedRunnables) {
+ return queuedRunnables.size();
+ }
+ }
+}
[3/5] flink git commit: [FLINK-5869] [flip-1] Add basic abstraction
for Failover Strategies to ExecutionGraph
Posted by se...@apache.org.
[FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph
- Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery
- Add base class for FailoverStrategy
- Add default implementation (restart all tasks)
- Add logic to load the failover strategy from the configuration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ed85fe4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ed85fe4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ed85fe4
Branch: refs/heads/master
Commit: 8ed85fe49b7595546a8f968e0faa1fa7d4da47ec
Parents: e006127
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 21 19:13:34 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 3 19:17:23 2017 +0200
----------------------------------------------------------------------
.../flink/configuration/JobManagerOptions.java | 7 +
.../java/org/apache/flink/util/StringUtils.java | 20 +
.../flink/runtime/execution/ExecutionState.java | 33 +-
.../flink/runtime/executiongraph/Execution.java | 56 ++-
.../runtime/executiongraph/ExecutionGraph.java | 315 +++++++++++----
.../executiongraph/ExecutionGraphBuilder.java | 10 +
.../executiongraph/ExecutionJobVertex.java | 172 +++-----
.../runtime/executiongraph/ExecutionVertex.java | 87 +++-
.../GlobalModVersionMismatch.java | 46 +++
.../failover/FailoverStrategy.java | 92 +++++
.../failover/FailoverStrategyLoader.java | 72 ++++
.../failover/RestartAllStrategy.java | 80 ++++
.../failover/RestartIndividualStrategy.java | 173 ++++++++
.../metrics/NumberOfFullRestartsGauge.java | 47 +++
.../flink/runtime/jobmaster/JobMaster.java | 2 +-
.../flink/runtime/jobmanager/JobManager.scala | 6 +-
.../checkpoint/CheckpointCoordinatorTest.java | 1 +
.../checkpoint/CoordinatorShutdownTest.java | 4 +-
...ExecutionGraphCheckpointCoordinatorTest.java | 4 +-
.../ExecutionGraphDeploymentTest.java | 114 ++----
.../ExecutionGraphMetricsTest.java | 12 +-
.../ExecutionGraphRestartTest.java | 33 +-
.../ExecutionGraphSchedulingTest.java | 9 +-
.../ExecutionGraphSignalsTest.java | 399 -------------------
.../executiongraph/ExecutionGraphStopTest.java | 176 ++++++++
.../ExecutionGraphSuspendTest.java | 307 ++++++++++++++
.../executiongraph/ExecutionGraphTestUtils.java | 263 ++++++++++--
.../ExecutionGraphVariousFailuesTest.java | 118 ++++++
.../ExecutionVertexCancelTest.java | 45 +--
.../ExecutionVertexLocalityTest.java | 2 +-
.../executiongraph/ExecutionVertexStopTest.java | 129 ------
.../executiongraph/FinalizeOnMasterTest.java | 96 +++++
.../executiongraph/GlobalModVersionTest.java | 212 ++++++++++
.../IndividualRestartsConcurrencyTest.java | 332 +++++++++++++++
.../TerminalJobStatusListener.java | 45 ---
.../utils/SimpleAckingTaskManagerGateway.java | 121 ++++++
.../utils/SimpleSlotProvider.java | 106 +++++
.../runtime/jobmanager/JobManagerTest.java | 2 +-
.../LeaderChangeJobRecoveryTest.java | 9 +-
.../partitioner/RescalePartitionerTest.java | 2 +
.../ManuallyTriggeredDirectExecutor.java | 70 ++++
41 files changed, 2808 insertions(+), 1021 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 10d9e16..5481d7a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -73,6 +73,13 @@ public class JobManagerOptions {
.withDeprecatedKeys("job-manager.max-attempts-history-size");
/**
+ * The maximum number of prior execution attempts kept in history.
+ */
+ public static final ConfigOption<String> EXECUTION_FAILOVER_STRATEGY =
+ key("jobmanager.execution.failover-strategy")
+ .defaultValue("full");
+
+ /**
* This option specifies the interval in order to trigger a resource manager reconnection if the connection
* to the resource manager has been lost.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index abd6ba6..6638062 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -330,6 +330,26 @@ public final class StringUtils {
return true;
}
+ /**
+ * If both string arguments are non-null, this method concatenates them with ' and '.
+ * If only one of the arguments is non-null, this method returns the non-null argument.
+ * If both arguments are null, this method returns null.
+ *
+ * @param s1 The first string argument
+ * @param s2 The second string argument
+ *
+ * @return The concatenated string, or non-null argument, or null
+ */
+ @Nullable
+ public static String concatenateWithAnd(@Nullable String s1, @Nullable String s2) {
+ if (s1 != null) {
+ return s2 == null ? s1 : s1 + " and " + s2;
+ }
+ else {
+ return s2 != null ? s2 : null;
+ }
+ }
+
// ------------------------------------------------------------------------
/** Prevent instantiation of this utility class */
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
index d6ff0cd..53ca8b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
@@ -25,27 +25,27 @@ package org.apache.flink.runtime.execution;
* <pre>{@code
*
* CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
- * | | | |
- * | | | +------+
- * | | V V
- * | | CANCELLING -----+----> CANCELED
- * | | |
- * | +-------------------------+
- * |
- * | ... -> FAILED
- * V
+ * | | | |
+ * | | | +------+
+ * | | V V
+ * | | CANCELLING -----+----> CANCELED
+ * | | |
+ * | +-------------------------+
+ * |
+ * | ... -> FAILED
+ * V
* RECONCILING -> RUNNING | FINISHED | CANCELED | FAILED
*
* }</pre>
*
* <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED}
* state if job manager fail over, and the {@code RECONCILING} state can switch into
- * any existing task state.</p>
+ * any existing task state.
*
- * <p>It is possible to enter the {@code FAILED} state from any other state.</p>
+ * <p>It is possible to enter the {@code FAILED} state from any other state.
*
* <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
- * considered terminal states.</p>
+ * considered terminal states.
*/
public enum ExecutionState {
@@ -56,7 +56,14 @@ public enum ExecutionState {
DEPLOYING,
RUNNING,
-
+
+ /**
+ * This state marks "successfully completed". It can only be reached when a
+ * program reaches the "end of its input". The "end of input" can be reached
+ * when consuming a bounded input (fix set of files, bounded query, etc) or
+ * when stopping a program (not cancelling!) which make the input look like
+ * it reached its end at a specific point.
+ */
FINISHED,
CANCELING,
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 2680849..c0f1f39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -74,12 +74,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
- * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
- * or other re-computation), this class tracks the state of a single execution of that vertex and the resources.
+ * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times
+ * (for recovery, re-computation, re-configuration), this class tracks the state of a single execution
+ * of that vertex and the resources.
*
- * <p>NOTE ABOUT THE DESIGN RATIONAL:
+ * <h2>Lock free state transitions</h2>
*
- * <p>In several points of the code, we need to deal with possible concurrent state changes and actions.
+ * In several points of the code, we need to deal with possible concurrent state changes and actions.
* For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
*
* <p>We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
@@ -113,6 +114,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
/** The unique ID marking the specific execution instant of the task */
private final ExecutionAttemptID attemptId;
+ /** Gets the global modification version of the execution graph when this execution was created.
+ * This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
+ * to resolve conflicts between concurrent modification by global and local failover actions. */
+ private final long globalModVersion;
+
+ /** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()} */
private final long[] stateTimestamps;
private final int attemptNumber;
@@ -146,10 +153,27 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// --------------------------------------------------------------------------------------------
+ /**
+ * Creates a new Execution attempt.
+ *
+ * @param executor
+ * The executor used to dispatch callbacks from futures and asynchronous RPC calls.
+ * @param vertex
+ * The execution vertex to which this Execution belongs
+ * @param attemptNumber
+ * The execution attempt number.
+ * @param globalModVersion
+ * The global modification version of the execution graph when this execution was created
+ * @param startTimestamp
+ * The timestamp that marks the creation of this Execution
+ * @param timeout
+ * The timeout for RPC calls like deploy/cancel/stop.
+ */
public Execution(
Executor executor,
ExecutionVertex vertex,
int attemptNumber,
+ long globalModVersion,
long startTimestamp,
Time timeout) {
@@ -158,6 +182,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
this.attemptId = new ExecutionAttemptID();
this.timeout = checkNotNull(timeout);
+ this.globalModVersion = globalModVersion;
this.attemptNumber = attemptNumber;
this.stateTimestamps = new long[ExecutionState.values().length];
@@ -190,6 +215,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
return state;
}
+ /**
+ * Gets the global modification version of the execution graph when this execution was created.
+ *
+ * <p>This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
+ * to resolve conflicts between concurrent modification by global and local failover actions.
+ */
+ public long getGlobalModVersion() {
+ return globalModVersion;
+ }
+
public SimpleSlot getAssignedResource() {
return assignedResource;
}
@@ -252,6 +287,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// Actions
// --------------------------------------------------------------------------------------------
+ public boolean scheduleForExecution() {
+ SlotProvider resourceProvider = getVertex().getExecutionGraph().getSlotProvider();
+ boolean allowQueued = getVertex().getExecutionGraph().isQueuedSchedulingAllowed();
+ return scheduleForExecution(resourceProvider, allowQueued);
+ }
+
/**
* NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
* to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
@@ -381,9 +422,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
taskState,
attemptNumber);
- // register this execution at the execution graph, to receive call backs
- vertex.getExecutionGraph().registerExecution(this);
-
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
@@ -823,7 +861,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (current != FAILED) {
String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
LOG.error(message);
- vertex.getExecutionGraph().fail(new Exception(message));
+ vertex.getExecutionGraph().failGlobal(new Exception(message));
}
return;
}
@@ -1069,7 +1107,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// make sure that the state transition completes normally.
// potential errors (in listeners may not affect the main logic)
try {
- vertex.notifyStateTransition(attemptId, targetState, error);
+ vertex.notifyStateTransition(this, targetState, error);
}
catch (Throwable t) {
LOG.error("Error while notifying execution graph of execution state transition.", t);
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index fff1ea2..5eaa637 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.commons.lang3.StringUtils;
-
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ArchivedExecutionConfig;
@@ -42,11 +40,15 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -65,10 +67,12 @@ import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,6 +95,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -100,9 +105,9 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* The execution graph is the central data structure that coordinates the distributed
* execution of a data flow. It keeps representations of each parallel task, each
- * intermediate result, and the communication between them.
+ * intermediate stream, and the communication between them.
*
- * The execution graph consists of the following constructs:
+ * <p>The execution graph consists of the following constructs:
* <ul>
* <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like
* "map" or "join") during execution. It holds the aggregated state of all parallel subtasks.
@@ -118,12 +123,41 @@ import static org.apache.flink.util.Preconditions.checkState;
* about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
* address the message receiver.</li>
* </ul>
+ *
+ * <h2>Global and local failover</h2>
+ *
+ * The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>.
+ *
+ * <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole
+ * data flow graph from the last completed checkpoint. Global failover is considered the
+ * "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is
+ * found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
+ *
+ * <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails.
+ * The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically
+ * attempts to restart as little as possible, but as much as necessary.
+ *
+ * <p>Between local- and global failover, the global failover always takes precedence, because it
+ * is the core mechanism that the ExecutionGraph relies on to bring back consistency. The
+ * guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented
+ * with every global failover (and other global actions, like job cancellation, or terminal
+ * failure). Local failover is always scoped by the modification version that the execution graph
+ * had when the failover was triggered. If a new global modification version is reached during
+ * local failover (meaning there is a concurrent global failover), the failover strategy has to
+ * yield before the global failover.
*/
public class ExecutionGraph implements AccessExecutionGraph, Archiveable<ArchivedExecutionGraph> {
+ /** In place updater for the execution graph's current state. Avoids having to use an
+ * AtomicReference and thus makes the frequent read access a bit faster */
private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
+ /** In place updater for the execution graph's current global recovery version.
+ * Avoids having to use an AtomicLong and thus makes the frequent read access a bit faster */
+ private static final AtomicLongFieldUpdater<ExecutionGraph> GLOBAL_VERSION_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(ExecutionGraph.class, "globalModVersion");
+
/** The log object used for debugging. */
static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
@@ -169,6 +203,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Listeners that receive messages whenever a single task execution changes its status */
private final List<ExecutionStatusListener> executionListeners;
+ /** The implementation that decides how to recover the failures of tasks */
+ private final FailoverStrategy failoverStrategy;
+
/** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
* the execution graph transitioned into a certain state. The index into this array is the
* ordinal of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is
@@ -178,6 +215,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** The timeout for all messages that require a response/acknowledgement */
private final Time rpcCallTimeout;
+ /** The timeout for bulk slot allocation (eager scheduling mode). After this timeout,
+ * slots are released and a recovery is triggered */
+ private final Time scheduleAllocationTimeout;
+
/** Strategy to use for restarts */
private final RestartStrategy restartStrategy;
@@ -190,6 +231,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Registered KvState instances reported by the TaskManagers. */
private final KvStateLocationRegistry kvStateLocationRegistry;
+ /** The total number of vertices currently in the execution graph */
private int numVerticesTotal;
// ------ Configuration of the Execution -------
@@ -203,8 +245,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* from results than need to be materialized. */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
- private final Time scheduleAllocationTimeout;
-
// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
private final AtomicInteger verticesFinished;
@@ -212,6 +252,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Current status of the job execution */
private volatile JobStatus state = JobStatus.CREATED;
+ /** A future that completes once the job has reached a terminal state */
+ private volatile CompletableFuture<JobStatus> terminationFuture;
+
+ /** On each global recovery, this version is incremented. The version breaks conflicts
+ * between concurrent restart attempts by local failover strategies */
+ private volatile long globalModVersion;
+
/** The exception that caused the job to fail. This is set to the first root exception
* that was not recoverable and triggered job failure */
private volatile Throwable failureCause;
@@ -233,7 +280,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// --------------------------------------------------------------------------------------------
/**
- * This constructor is for tests only, because it does not include class loading information.
+ * This constructor is for tests only, because it sets default values for many fields.
*/
@VisibleForTesting
ExecutionGraph(
@@ -255,6 +302,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
serializedConfig,
timeout,
restartStrategy,
+ new RestartAllStrategy.Factory(),
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
slotProvider,
@@ -270,6 +318,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
SerializedValue<ExecutionConfig> serializedConfig,
Time timeout,
RestartStrategy restartStrategy,
+ FailoverStrategy.Factory failoverStrategyFactory,
List<BlobKey> requiredJarFiles,
List<URL> requiredClasspaths,
SlotProvider slotProvider,
@@ -322,6 +371,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
this.verticesFinished = new AtomicInteger();
+
+ this.globalModVersion = 1L;
+
+ // the failover strategy must be instantiated last, so that the execution graph
+ // is ready by the time the failover strategy sees it
+ this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
+ LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
}
// --------------------------------------------------------------------------------------------
@@ -541,6 +597,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return failureCause;
}
+ /**
+ * Gets the number of full restarts that the execution graph went through.
+ * If a full restart recovery is currently pending, this recovery is included in the
+ * count.
+ *
+ * @return The number of full restarts so far
+ */
+ public long getNumberOfFullRestarts() {
+ // subtract one, because the version starts at one
+ return globalModVersion - 1;
+ }
+
@Override
public String getFailureCauseAsString() {
return ExceptionUtils.stringifyException(failureCause);
@@ -689,11 +757,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// --------------------------------------------------------------------------------------------
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
- + "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size()));
- }
+ LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
+ "vertices and {} intermediate results.",
+ topologiallySorted.size(), tasks.size(), intermediateResults.size());
+
+ final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
final long createTimestamp = System.currentTimeMillis();
for (JobVertex jobVertex : topologiallySorted) {
@@ -704,7 +773,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv =
- new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, createTimestamp);
+ new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, globalModVersion, createTimestamp);
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
@@ -723,7 +792,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
+ newExecJobVertices.add(ejv);
}
+
+ terminationFuture = new FlinkCompletableFuture<>();
+ failoverStrategy.notifyNewVertices(newExecJobVertices);
}
public void scheduleForExecution() throws JobException {
@@ -856,7 +929,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// ExecutionGraph notices
// we need to go into recovery and make sure to release all slots
try {
- fail(t);
+ failGlobal(t);
}
finally {
ExecutionGraphUtils.releaseAllSlotsSilently(resources);
@@ -889,6 +962,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
if (transitionState(current, JobStatus.CANCELLING)) {
+ // make sure no concurrent local actions interfere with the cancellation
+ incrementGlobalModVersion();
+
final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
// cancel all tasks (that still need cancelling)
@@ -920,8 +996,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
else if (current == JobStatus.RESTARTING) {
synchronized (progressLock) {
if (transitionState(current, JobStatus.CANCELED)) {
- postRunCleanup();
- progressLock.notifyAll();
+ onTerminalState(JobStatus.CANCELED);
LOG.info("Canceled during restart.");
return;
@@ -951,7 +1026,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* Suspends the current ExecutionGraph.
*
* The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
- * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
+ * state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed.
*
* The SUSPENDED state is a local terminal state which stops the execution of the job but does
* not remove the job from the HA job store so that it can be recovered by another JobManager.
@@ -962,21 +1037,23 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
while (true) {
JobStatus currentState = state;
- if (currentState.isGloballyTerminalState()) {
+ if (currentState.isTerminalState()) {
// stay in a terminal state
return;
} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
this.failureCause = suspensionCause;
+ // make sure no concurrent local actions interfere with the cancellation
+ incrementGlobalModVersion();
+
for (ExecutionJobVertex ejv: verticesInCreationOrder) {
ejv.cancel();
}
synchronized (progressLock) {
- postRunCleanup();
- progressLock.notifyAll();
+ onTerminalState(JobStatus.SUSPENDED);
- LOG.info("Job {} has been suspended.", getJobID());
+ LOG.info("Job {} has been suspended.", getJobID());
}
return;
@@ -984,7 +1061,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
- public void fail(Throwable t) {
+ /**
+ * Fails the execution graph globally. This failure will not be recovered by a specific
+ * failover strategy, but results in a full restart of all tasks.
+ *
+ * <p>This global failure is meant to be triggered in cases where the consistency of the
+ * execution graph' state cannot be guaranteed any more (for example when catching unexpected
+ * exceptions that indicate a bug or an unexpected call race), and where a full restart is the
+ * safe way to get consistency back.
+ *
+ * @param t The exception that caused the failure.
+ */
+ public void failGlobal(Throwable t) {
while (true) {
JobStatus current = state;
// stay in these states
@@ -1003,6 +1091,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
else if (transitionState(current, JobStatus.FAILING, t)) {
this.failureCause = t;
+ // make sure no concurrent local actions interfere with the cancellation
+ incrementGlobalModVersion();
+
// we build a future that is complete once all vertices have reached a terminal state
final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
@@ -1044,23 +1135,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
throw new IllegalStateException("Can only restart job from state restarting.");
}
- if (slotProvider == null) {
- throw new IllegalStateException("The execution graph has not been scheduled before - slotProvider is null.");
- }
-
this.currentExecutions.clear();
- Collection<CoLocationGroup> colGroups = new HashSet<>();
+ final Collection<CoLocationGroup> colGroups = new HashSet<>();
+ final long resetTimestamp = System.currentTimeMillis();
for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
CoLocationGroup cgroup = jv.getCoLocationGroup();
- if(cgroup != null && !colGroups.contains(cgroup)){
+ if (cgroup != null && !colGroups.contains(cgroup)){
cgroup.resetConstraints();
colGroups.add(cgroup);
}
- jv.resetForNewExecution();
+ jv.resetForNewExecution(resetTimestamp, globalModVersion);
}
for (int i = 0; i < stateTimestamps.length; i++) {
@@ -1083,7 +1171,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
catch (Throwable t) {
LOG.warn("Failed to restart the job.", t);
- fail(t);
+ failGlobal(t);
}
}
@@ -1124,32 +1212,45 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return null;
}
+ @VisibleForTesting
+ public Future<JobStatus> getTerminationFuture() {
+ return terminationFuture;
+ }
+
+ @VisibleForTesting
+ public JobStatus waitUntilTerminal() throws InterruptedException {
+ try {
+ return terminationFuture.get();
+ }
+ catch (ExecutionException e) {
+ // this should never happen
+ // it would be a bug, so we don't expect this to be handled and throw
+ // an unchecked exception here
+ throw new RuntimeException(e);
+ }
+ }
+
/**
- * For testing: This waits until the job execution has finished.
+ * Gets the failover strategy used by the execution graph to recover from failures of tasks.
*/
- public void waitUntilFinished() throws InterruptedException {
- // we may need multiple attempts in the presence of failures / recovery
- while (true) {
- for (ExecutionJobVertex ejv : verticesInCreationOrder) {
- for (ExecutionVertex ev : ejv.getTaskVertices()) {
- try {
- ev.getCurrentExecutionAttempt().getTerminationFuture().get();
- }
- catch (ExecutionException e) {
- // this should never happen
- throw new RuntimeException(e);
- }
- }
- }
+ public FailoverStrategy getFailoverStrategy() {
+ return this.failoverStrategy;
+ }
- // now that all vertices have been (at some point) in a terminal state,
- // we need to check if the job as a whole has entered a final state
- if (state.isTerminalState()) {
- return;
- }
- }
+ /**
+ * Gets the current global modification version of the ExecutionGraph.
+ * The global modification version is incremented with each global action (cancel/fail/restart)
+ * and is used to disambiguate concurrent modifications between local and global
+ * failover actions.
+ */
+ long getGlobalModVersion() {
+ return globalModVersion;
}
+ // ------------------------------------------------------------------------
+ // State Transitions
+ // ------------------------------------------------------------------------
+
private boolean transitionState(JobStatus current, JobStatus newState) {
return transitionState(current, newState, null);
}
@@ -1175,11 +1276,45 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
+ private long incrementGlobalModVersion() {
+ return GLOBAL_VERSION_UPDATER.incrementAndGet(this);
+ }
+
+ // ------------------------------------------------------------------------
+ // Job Status Progress
+ // ------------------------------------------------------------------------
+
+ /**
+ * Called whenever a vertex reaches state FINISHED (completed successfully).
+ * Once all vertices are in the FINISHED state, the program is successfully done.
+ */
void vertexFinished() {
- int numFinished = verticesFinished.incrementAndGet();
+ final int numFinished = verticesFinished.incrementAndGet();
if (numFinished == numVerticesTotal) {
// done :-)
- allVerticesInTerminalState();
+
+ // check whether we are still in "RUNNING" and trigger the final cleanup
+ if (state == JobStatus.RUNNING) {
+ // we do the final cleanup in the I/O executor, because it may involve
+ // some heavier work
+
+ try {
+ for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+ ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
+ }
+ }
+ catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalError(t);
+ failGlobal(new Exception("Failed to finalize execution on master", t));
+ return;
+ }
+
+ // if we do not make this state transition, then a concurrent
+ // cancellation or failure happened
+ if (transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) {
+ onTerminalState(JobStatus.FINISHED);
+ }
+ }
}
}
@@ -1187,6 +1322,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
verticesFinished.getAndDecrement();
}
+ /**
+ * This method is a callback during cancellation/failover and called when all tasks
+ * have reached a terminal state (cancelled/failed/finished).
+ */
private void allVerticesInTerminalState() {
// we are done, transition to the final state
JobStatus current;
@@ -1194,14 +1333,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
current = this.state;
if (current == JobStatus.RUNNING) {
- if (transitionState(current, JobStatus.FINISHED)) {
- postRunCleanup();
- break;
- }
+ failGlobal(new Exception("ExecutionGraph went into allVerticesInTerminalState() from RUNNING"));
}
else if (current == JobStatus.CANCELLING) {
if (transitionState(current, JobStatus.CANCELED)) {
- postRunCleanup();
+ onTerminalState(JobStatus.CANCELED);
break;
}
}
@@ -1221,7 +1357,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
break;
}
else {
- fail(new Exception("ExecutionGraph went into final state from state " + current));
+ failGlobal(new Exception("ExecutionGraph went into final state from state " + current));
break;
}
}
@@ -1255,18 +1391,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
restartStrategy.restart(this);
return true;
- } else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
- final List<String> reasonsForNoRestart = new ArrayList<>(2);
- if (!isFailureCauseAllowingRestart) {
- reasonsForNoRestart.add("a type of SuppressRestartsException was thrown");
- }
- if (!isRestartStrategyAllowingRestart) {
- reasonsForNoRestart.add("the restart strategy prevented it");
- }
+ }
+ else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
+ final String cause1 = isFailureCauseAllowingRestart ? null :
+ "a type of SuppressRestartsException was thrown";
+ final String cause2 = isRestartStrategyAllowingRestart ? null :
+ "the restart strategy prevented it";
LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(),
- StringUtils.join(reasonsForNoRestart, " and "), failureCause);
- postRunCleanup();
+ StringUtils.concatenateWithAnd(cause1, cause2), failureCause);
+ onTerminalState(JobStatus.FAILED);
return true;
} else {
@@ -1280,16 +1414,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
- private void postRunCleanup() {
+ private void onTerminalState(JobStatus status) {
try {
CheckpointCoordinator coord = this.checkpointCoordinator;
this.checkpointCoordinator = null;
if (coord != null) {
- coord.shutdown(state);
+ coord.shutdown(status);
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
LOG.error("Error while cleaning up after execution", e);
}
+ finally {
+ terminationFuture.complete(status);
+ }
}
// --------------------------------------------------------------------------------------------
@@ -1343,7 +1481,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
// failures during updates leave the ExecutionGraph inconsistent
- fail(t);
+ failGlobal(t);
return false;
}
}
@@ -1405,7 +1543,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
void registerExecution(Execution exec) {
Execution previous = currentExecutions.putIfAbsent(exec.getAttemptId(), exec);
if (previous != null) {
- fail(new Exception("Trying to register execution " + exec + " for already used ID " + exec.getAttemptId()));
+ failGlobal(new Exception("Trying to register execution " + exec + " for already used ID " + exec.getAttemptId()));
}
}
@@ -1413,7 +1551,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
Execution contained = currentExecutions.remove(exec.getAttemptId());
if (contained != null && contained != exec) {
- fail(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
+ failGlobal(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
}
}
@@ -1471,21 +1609,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
void notifyExecutionChange(
- JobVertexID vertexId, int subtask, ExecutionAttemptID executionID,
- ExecutionState newExecutionState, Throwable error)
- {
- ExecutionJobVertex vertex = getJobVertex(vertexId);
+ final Execution execution,
+ final ExecutionState newExecutionState,
+ final Throwable error) {
if (executionListeners.size() > 0) {
+ final ExecutionJobVertex vertex = execution.getVertex().getJobVertex();
final String message = error == null ? null : ExceptionUtils.stringifyException(error);
final long timestamp = System.currentTimeMillis();
for (ExecutionStatusListener listener : executionListeners) {
try {
listener.executionStatusChanged(
- getJobID(), vertexId, vertex.getJobVertex().getName(),
- vertex.getParallelism(), subtask, executionID, newExecutionState,
- timestamp, message);
+ getJobID(), vertex.getJobVertexId(), vertex.getJobVertex().getName(),
+ vertex.getParallelism(), execution.getParallelSubtaskIndex(),
+ execution.getAttemptId(), newExecutionState, timestamp, message);
} catch (Throwable t) {
LOG.warn("Error while notifying ExecutionStatusListener", t);
}
@@ -1494,7 +1632,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// see what this means for us. currently, the first FAILED state means -> FAILED
if (newExecutionState == ExecutionState.FAILED) {
- fail(error);
+ final Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)");
+
+ // by filtering out late failure calls, we can save some work in
+ // avoiding redundant local failover
+ if (execution.getGlobalModVersion() == globalModVersion) {
+ try {
+ failoverStrategy.onTaskFailure(execution, ex);
+ }
+ catch (Throwable t) {
+ // bug in the failover strategy - fall back to global failover
+ LOG.warn("Error in failover strategy - falling back to global restart", t);
+ failGlobal(ex);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index b40817f..88863e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -34,7 +34,10 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -90,6 +93,9 @@ public class ExecutionGraphBuilder {
final String jobName = jobGraph.getName();
final JobID jobId = jobGraph.getJobID();
+ final FailoverStrategy.Factory failoverStrategy =
+ FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
+
// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph = (prior != null) ? prior :
new ExecutionGraph(
@@ -101,6 +107,7 @@ public class ExecutionGraphBuilder {
jobGraph.getSerializedExecutionConfig(),
timeout,
restartStrategy,
+ failoverStrategy,
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths(),
slotProvider,
@@ -269,6 +276,9 @@ public class ExecutionGraphBuilder {
metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
+ metrics.gauge(NumberOfFullRestartsGauge.METRIC_NAME, new NumberOfFullRestartsGauge(executionGraph));
+
+ executionGraph.getFailoverStrategy().registerMetrics(metrics);
return executionGraph;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 3197e65..3a98e0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
@@ -28,7 +29,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
-import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.concurrent.Future;
@@ -58,6 +58,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+/**
+ * An {@code ExecutionJobVertex} is part of the {@link ExecutionGraph}, and the peer
+ * to the {@link JobVertex}.
+ *
+ * <p>The {@code ExecutionJobVertex} corresponds to a parallelized operation. It
+ * contains an {@link ExecutionVertex} for each parallel instance of that operation.
+ */
public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable<ArchivedExecutionJobVertex> {
/** Use the same log for all ExecutionGraph classes */
@@ -115,21 +122,26 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
private SerializedValue<TaskInformation> serializedTaskInformation;
private InputSplitAssigner splitAssigner;
-
- public ExecutionJobVertex(
+
+ /**
+ * Convenience constructor for testing.
+ */
+ @VisibleForTesting
+ ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
Time timeout) throws JobException {
- this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
+ this(graph, jobVertex, defaultParallelism, timeout, 1L, System.currentTimeMillis());
}
-
+
public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
Time timeout,
+ long initialGlobalModVersion,
long createTimestamp) throws JobException {
if (graph == null || jobVertex == null) {
@@ -190,18 +202,24 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
// create all task vertices
for (int i = 0; i < numTaskVertices; i++) {
ExecutionVertex vertex = new ExecutionVertex(
- this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);
+ this,
+ i,
+ producedDataSets,
+ timeout,
+ initialGlobalModVersion,
+ createTimestamp,
+ maxPriorAttemptsHistoryLength);
this.taskVertices[i] = vertex;
}
-
+
// sanity check for the double referencing between intermediate result partitions and execution vertices
for (IntermediateResult ir : this.producedDataSets) {
if (ir.getNumberOfAssignedPartitions() != parallelism) {
throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
}
}
-
+
// set up the input splits, if the vertex has any
try {
@SuppressWarnings("unchecked")
@@ -508,7 +526,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
}
- public void resetForNewExecution() {
+ public void resetForNewExecution(final long timestamp, final long expectedGlobalModVersion)
+ throws GlobalModVersionMismatch {
synchronized (stateMonitor) {
// check and reset the sharing groups with scheduler hints
@@ -517,7 +536,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
for (int i = 0; i < parallelism; i++) {
- taskVertices[i].resetForNewExecution();
+ taskVertices[i].resetForNewExecution(timestamp, expectedGlobalModVersion);
}
// set up the input splits again
@@ -558,112 +577,36 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
// --------------------------------------------------------------------------------------------
- // Static / pre-assigned input splits
+ // Archiving
// --------------------------------------------------------------------------------------------
- private List<LocatableInputSplit>[] computeLocalInputSplitsPerTask(InputSplit[] splits) throws JobException {
-
- final int numSubTasks = getParallelism();
-
- // sanity check
- if (numSubTasks > splits.length) {
- throw new JobException("Strictly local assignment requires at least as many splits as subtasks.");
- }
-
- // group the splits by host while preserving order per host
- Map<String, List<LocatableInputSplit>> splitsByHost = new HashMap<String, List<LocatableInputSplit>>();
-
- for (InputSplit split : splits) {
- // check that split has exactly one local host
- if(!(split instanceof LocatableInputSplit)) {
- throw new JobException("Invalid InputSplit type " + split.getClass().getCanonicalName() + ". " +
- "Strictly local assignment requires LocatableInputSplit");
- }
- LocatableInputSplit lis = (LocatableInputSplit) split;
-
- if (lis.getHostnames() == null) {
- throw new JobException("LocatableInputSplit has no host information. " +
- "Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
- }
- else if (lis.getHostnames().length != 1) {
- throw new JobException("Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
- }
- String hostName = lis.getHostnames()[0];
-
- if (hostName == null) {
- throw new JobException("For strictly local input split assignment, no null host names are allowed.");
- }
-
- List<LocatableInputSplit> hostSplits = splitsByHost.get(hostName);
- if (hostSplits == null) {
- hostSplits = new ArrayList<LocatableInputSplit>();
- splitsByHost.put(hostName, hostSplits);
- }
- hostSplits.add(lis);
- }
-
-
- int numHosts = splitsByHost.size();
-
- if (numSubTasks < numHosts) {
- throw new JobException("Strictly local split assignment requires at least as " +
- "many parallel subtasks as distinct split hosts. Please increase the parallelism " +
- "of DataSource "+this.getJobVertex().getName()+" to at least "+numHosts+".");
- }
-
- // get list of hosts in deterministic order
- List<String> hosts = new ArrayList<String>(splitsByHost.keySet());
- Collections.sort(hosts);
-
- @SuppressWarnings("unchecked")
- List<LocatableInputSplit>[] subTaskSplitAssignment = (List<LocatableInputSplit>[]) new List<?>[numSubTasks];
-
- final int subtasksPerHost = numSubTasks / numHosts;
- final int hostsWithOneMore = numSubTasks % numHosts;
-
- int subtaskNum = 0;
-
- // we go over all hosts and distribute the hosts' input splits
- // over the subtasks
- for (int hostNum = 0; hostNum < numHosts; hostNum++) {
- String host = hosts.get(hostNum);
- List<LocatableInputSplit> splitsOnHost = splitsByHost.get(host);
-
- int numSplitsOnHost = splitsOnHost.size();
-
- // the number of subtasks to split this over.
- // NOTE: if the host has few splits, some subtasks will not get anything.
- int subtasks = Math.min(numSplitsOnHost,
- hostNum < hostsWithOneMore ? subtasksPerHost + 1 : subtasksPerHost);
-
- int splitsPerSubtask = numSplitsOnHost / subtasks;
- int subtasksWithOneMore = numSplitsOnHost % subtasks;
-
- int splitnum = 0;
-
- // go over the subtasks and grab a subrange of the input splits
- for (int i = 0; i < subtasks; i++) {
- int numSplitsForSubtask = (i < subtasksWithOneMore ? splitsPerSubtask + 1 : splitsPerSubtask);
-
- List<LocatableInputSplit> splitList;
-
- if (numSplitsForSubtask == numSplitsOnHost) {
- splitList = splitsOnHost;
- }
- else {
- splitList = new ArrayList<LocatableInputSplit>(numSplitsForSubtask);
- for (int k = 0; k < numSplitsForSubtask; k++) {
- splitList.add(splitsOnHost.get(splitnum++));
- }
- }
-
- subTaskSplitAssignment[subtaskNum++] = splitList;
- }
- }
-
- return subTaskSplitAssignment;
+ @Override
+ public ArchivedExecutionJobVertex archive() {
+ return new ArchivedExecutionJobVertex(this);
}
+ // ------------------------------------------------------------------------
+ // Static Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * A utility function that computes an "aggregated" state for the vertex.
+ *
+ * <p>This state is not used anywhere in the coordination, but can be used for display
+ * in dashboards to as a summary for how the particular parallel operation represented by
+ * this ExecutionJobVertex is currently behaving.
+ *
+ * <p>For example, if at least one parallel task is failed, the aggregate state is failed.
+ * If not, and at least one parallel task is cancelling (or cancelled), the aggregate state
+ * is cancelling (or cancelled). If all tasks are finished, the aggregate state is finished,
+ * and so on.
+ *
+ * @param verticesPerState The number of vertices in each state (indexed by the ordinal of
+ * the ExecutionState values).
+ * @param parallelism The parallelism of the ExecutionJobVertex
+ *
+ * @return The aggregate state of this ExecutionJobVertex.
+ */
public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {
throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
@@ -739,9 +682,4 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return expanded;
}
-
- @Override
- public ArchivedExecutionJobVertex archive() {
- return new ArchivedExecutionJobVertex(this);
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index bcf7a7c..e8c1984 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
@@ -97,7 +98,11 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// --------------------------------------------------------------------------------------------
- public ExecutionVertex(
+ /**
+ * Convenience constructor for tests. Sets various fields to default values.
+ */
+ @VisibleForTesting
+ ExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
@@ -108,24 +113,28 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
subTaskIndex,
producedDataSets,
timeout,
+ 1L,
System.currentTimeMillis(),
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
}
+ /**
+ *
+ * @param timeout
+ * The RPC timeout to use for deploy / cancel calls
+ * @param initialGlobalModVersion
+ * The global modification version to initialize the first Execution with.
+ * @param createTimestamp
+ * The timestamp for the vertex creation, used to initialize the first Execution with.
+ * @param maxPriorExecutionHistoryLength
+ * The number of prior Executions (= execution attempts) to keep.
+ */
public ExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout,
- int maxPriorExecutionHistoryLength) {
- this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis(), maxPriorExecutionHistoryLength);
- }
-
- public ExecutionVertex(
- ExecutionJobVertex jobVertex,
- int subTaskIndex,
- IntermediateResult[] producedDataSets,
- Time timeout,
+ long initialGlobalModVersion,
long createTimestamp,
int maxPriorExecutionHistoryLength) {
@@ -151,6 +160,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
getExecutionGraph().getFutureExecutor(),
this,
0,
+ initialGlobalModVersion,
createTimestamp,
timeout);
@@ -163,6 +173,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
this.locationConstraint = null;
}
+ getExecutionGraph().registerExecution(currentExecution);
+
this.timeout = timeout;
}
@@ -508,11 +520,40 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// Actions
// --------------------------------------------------------------------------------------------
- public Execution resetForNewExecution() {
-
+ /**
+ * Archives the current Execution and creates a new Execution for this vertex.
+ *
+ * <p>This method atomically checks if the ExecutionGraph is still of an expected
+ * global mod. version and replaces the execution if that is the case. If the ExecutionGraph
+ * has increased its global mod. version in the meantime, this operation fails.
+ *
+ * <p>This mechanism can be used to prevent conflicts between various concurrent recovery and
+ * reconfiguration actions in a similar way as "optimistic concurrency control".
+ *
+ * @param timestamp
+ * The creation timestamp for the new Execution
+ * @param originatingGlobalModVersion
+ * The
+ *
+ * @return Returns the new created Execution.
+ *
+ * @throws GlobalModVersionMismatch Thrown, if the execution graph has a new global mod
+ * version than the one passed to this message.
+ */
+ public Execution resetForNewExecution(final long timestamp, final long originatingGlobalModVersion)
+ throws GlobalModVersionMismatch
+ {
LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
synchronized (priorExecutions) {
+ // check if another global modification has been triggered since the
+ // action that originally caused this reset/restart happened
+ final long actualModVersion = getExecutionGraph().getGlobalModVersion();
+ if (actualModVersion > originatingGlobalModVersion) {
+ // global change happened since, reject this action
+ throw new GlobalModVersionMismatch(originatingGlobalModVersion, actualModVersion);
+ }
+
final Execution oldExecution = currentExecution;
final ExecutionState oldState = oldExecution.getState();
@@ -522,8 +563,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
final Execution newExecution = new Execution(
getExecutionGraph().getFutureExecutor(),
this,
- oldExecution.getAttemptNumber()+1,
- System.currentTimeMillis(),
+ oldExecution.getAttemptNumber() + 1,
+ originatingGlobalModVersion,
+ timestamp,
timeout);
this.currentExecution = newExecution;
@@ -533,6 +575,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
}
+ // register this execution at the execution graph, to receive call backs
+ getExecutionGraph().registerExecution(newExecution);
+
// if the execution was 'FINISHED' before, tell the ExecutionGraph that
// we take one step back on the road to reaching global FINISHED
if (oldState == FINISHED) {
@@ -640,9 +685,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// --------------------------------------------------------------------------------------------
void executionFinished(Execution execution) {
- if (execution == currentExecution) {
- getExecutionGraph().vertexFinished();
- }
+ getExecutionGraph().vertexFinished();
}
void executionCanceled(Execution execution) {
@@ -658,10 +701,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// --------------------------------------------------------------------------------------------
/**
- * Simply forward this notification. This is for logs and event archivers.
+ * Simply forward this notification
*/
- void notifyStateTransition(ExecutionAttemptID executionId, ExecutionState newState, Throwable error) {
- getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
+ void notifyStateTransition(Execution execution, ExecutionState newState, Throwable error) {
+ // only forward this notification if the execution is still the current execution
+ // otherwise we have an outdated execution
+ if (currentExecution == execution) {
+ getExecutionGraph().notifyExecutionChange(execution, newState, error);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java
new file mode 100644
index 0000000..bc96805
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+/**
+ * An exception that indicates a mismatch between the expected global modification version
+ * of the execution graph, and the actual modification version.
+ */
+public class GlobalModVersionMismatch extends Exception {
+
+ private static final long serialVersionUID = 6643688395797045098L;
+
+ private final long expectedModVersion;
+
+ private final long actualModVersion;
+
+ public GlobalModVersionMismatch(long expectedModVersion, long actualModVersion) {
+ super("expected=" + expectedModVersion + ", actual=" + actualModVersion);
+ this.expectedModVersion = expectedModVersion;
+ this.actualModVersion = actualModVersion;
+ }
+
+ public long expectedModVersion() {
+ return expectedModVersion;
+ }
+
+ public long actualModVersion() {
+ return actualModVersion;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java
new file mode 100644
index 0000000..2c4313f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+
+import java.util.List;
+
+/**
+ * A {@code FailoverStrategy} describes how the job computation recovers from task
+ * failures.
+ *
+ * <p>Failover strategies implement recovery logic for failures of tasks. The execution
+ * graph still implements "global failure / recovery" (which restarts all tasks) as
+ * a fallback plan or safety net in cases where it deems that the state of the graph
+ * may have become inconsistent.
+ */
+public abstract class FailoverStrategy {
+
+
+ // ------------------------------------------------------------------------
+ // failover implementation
+ // ------------------------------------------------------------------------
+
+ /**
+ * Called by the execution graph when a task failure occurs.
+ *
+ * @param taskExecution The execution attempt of the failed task.
+ * @param cause The exception that caused the task failure.
+ */
+ public abstract void onTaskFailure(Execution taskExecution, Throwable cause);
+
+ /**
+ * Called whenever new vertices are added to the ExecutionGraph.
+ *
+ * @param newJobVerticesTopological The newly added vertices, in topological order.
+ */
+ public abstract void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological);
+
+ /**
+ * Gets the name of the failover strategy, for logging purposes.
+ */
+ public abstract String getStrategyName();
+
+ /**
+ * Tells the FailoverStrategy to register its metrics.
+ *
+ * <p>The default implementation does nothing
+ *
+ * @param metricGroup The metric group to register the metrics at
+ */
+ public void registerMetrics(MetricGroup metricGroup) {}
+
+ // ------------------------------------------------------------------------
+ // factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * This factory is a necessary indirection when creating the FailoverStrategy to that
+ * we can have both the FailoverStrategy final in the ExecutionGraph, and the
+ * ExecutionGraph final in the FailOverStrategy.
+ */
+ public interface Factory {
+
+ /**
+ * Instantiates the {@code FailoverStrategy}.
+ *
+ * @param executionGraph The execution graph for which the strategy implements failover.
+ * @return The instantiated failover strategy.
+ */
+ FailoverStrategy create(ExecutionGraph executionGraph);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
new file mode 100644
index 0000000..f18a90f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+/**
+ * A utility class to load failover strategies from the configuration.
+ */
+public class FailoverStrategyLoader {
+
+ /** Config name for the {@link RestartAllStrategy} */
+ public static final String FULL_RESTART_STRATEGY_NAME = "full";
+
+ /** Config name for the strategy that restarts individual tasks */
+ public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual";
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Loads a FailoverStrategy Factory from the given configuration.
+ */
+ public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config, @Nullable Logger logger) {
+ final String strategyParam = config.getString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY);
+
+ if (StringUtils.isNullOrWhitespaceOnly(strategyParam)) {
+ if (logger != null) {
+ logger.warn("Null config value for {} ; using default failover strategy (full restarts).",
+ JobManagerOptions.EXECUTION_FAILOVER_STRATEGY.key());
+ }
+
+ return new RestartAllStrategy.Factory();
+ }
+ else {
+ switch (strategyParam.toLowerCase()) {
+ case FULL_RESTART_STRATEGY_NAME:
+ return new RestartAllStrategy.Factory();
+
+ case INDIVIDUAL_RESTART_STRATEGY_NAME:
+ return new RestartIndividualStrategy.Factory();
+
+ default:
+ // we could interpret the parameter as a factory class name and instantiate that
+ // for now we simply do not support this
+ throw new IllegalConfigurationException("Unknown failover strategy: " + strategyParam);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java
new file mode 100644
index 0000000..21f42b9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple failover strategy that triggers a restart of all tasks in the
+ * execution graph, via {@link ExecutionGraph#failGlobal(Throwable)}.
+ */
+public class RestartAllStrategy extends FailoverStrategy {
+
+ /** The execution graph to recover */
+ private final ExecutionGraph executionGraph;
+
+ /**
+ * Creates a new failover strategy that recovers from failures by restarting all tasks
+ * of the execution graph.
+ *
+ * @param executionGraph The execution graph to handle.
+ */
+ public RestartAllStrategy(ExecutionGraph executionGraph) {
+ this.executionGraph = checkNotNull(executionGraph);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void onTaskFailure(Execution taskExecution, Throwable cause) {
+ // this strategy makes every task failure a global failure
+ executionGraph.failGlobal(cause);
+ }
+
+ @Override
+ public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
+ // nothing to do
+ }
+
+ @Override
+ public String getStrategyName() {
+ return "full graph restart";
+ }
+
+ // ------------------------------------------------------------------------
+ // factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Factory that instantiates the RestartAllStrategy.
+ */
+ public static class Factory implements FailoverStrategy.Factory {
+
+ @Override
+ public FailoverStrategy create(ExecutionGraph executionGraph) {
+ return new RestartAllStrategy(executionGraph);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
new file mode 100644
index 0000000..0a449b8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple failover strategy that restarts each task individually.
+ * This strategy is only applicable if the entire job consists unconnected
+ * tasks, meaning each task is its own component.
+ */
+public class RestartIndividualStrategy extends FailoverStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RestartIndividualStrategy.class);
+
+ // ------------------------------------------------------------------------
+
+ /** The execution graph to recover */
+ private final ExecutionGraph executionGraph;
+
+ /** The executor that executes restart callbacks */
+ private final Executor callbackExecutor;
+
+ private final SimpleCounter numTaskFailures;
+
+ /**
+ * Creates a new failover strategy that recovers from failures by restarting all tasks
+ * of the execution graph.
+ *
+ * <p>The strategy will use the ExecutionGraph's future executor for callbacks.
+ *
+ * @param executionGraph The execution graph to handle.
+ */
+ public RestartIndividualStrategy(ExecutionGraph executionGraph) {
+ this(executionGraph, executionGraph.getFutureExecutor());
+ }
+
+ /**
+ * Creates a new failover strategy that recovers from failures by restarting all tasks
+ * of the execution graph.
+ *
+ * @param executionGraph The execution graph to handle.
+ * @param callbackExecutor The executor that executes restart callbacks
+ */
+ public RestartIndividualStrategy(ExecutionGraph executionGraph, Executor callbackExecutor) {
+ this.executionGraph = checkNotNull(executionGraph);
+ this.callbackExecutor = checkNotNull(callbackExecutor);
+
+ this.numTaskFailures = new SimpleCounter();
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void onTaskFailure(Execution taskExecution, Throwable cause) {
+
+ // to better handle the lack of resources (potentially by a scale-in), we
+ // make failures due to missing resources global failures
+ if (cause instanceof NoResourceAvailableException) {
+ LOG.info("Not enough resources to schedule {} - triggering full recovery.", taskExecution);
+ executionGraph.failGlobal(cause);
+ return;
+ }
+
+ LOG.info("Recovering task failure for {} (#{}) via individual restart.",
+ taskExecution.getVertex().getTaskNameWithSubtaskIndex(), taskExecution.getAttemptNumber());
+
+ numTaskFailures.inc();
+
+ // trigger the restart once the task has reached its terminal state
+ // Note: currently all tasks passed here are already in their terminal state,
+ // so we could actually avoid the future. We use it anyways because it is cheap and
+ // it helps to support better testing
+ final Future<ExecutionState> terminationFuture = taskExecution.getTerminationFuture();
+
+ final ExecutionVertex vertexToRecover = taskExecution.getVertex();
+ final long globalModVersion = taskExecution.getGlobalModVersion();
+
+ terminationFuture.thenAcceptAsync(new AcceptFunction<ExecutionState>() {
+ @Override
+ public void accept(ExecutionState value) {
+ try {
+ long createTimestamp = System.currentTimeMillis();
+ Execution newExecution = vertexToRecover.resetForNewExecution(createTimestamp, globalModVersion);
+ newExecution.scheduleForExecution();
+ }
+ catch (GlobalModVersionMismatch e) {
+ // this happens if a concurrent global recovery happens. simply do nothing.
+ }
+ catch (Exception e) {
+ executionGraph.failGlobal(
+ new Exception("Error during fine grained recovery - triggering full recovery", e));
+ }
+ }
+ }, callbackExecutor);
+ }
+
+ @Override
+ public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
+ // we validate here that the vertices are in fact not connected to
+ // any other vertices
+ for (ExecutionJobVertex ejv : newJobVerticesTopological) {
+ List<IntermediateResult> inputs = ejv.getInputs();
+ IntermediateResult[] outputs = ejv.getProducedDataSets();
+
+ if ((inputs != null && inputs.size() > 0) || (outputs != null && outputs.length > 0)) {
+ throw new FlinkRuntimeException("Incompatible failover strategy - strategy '" +
+ getStrategyName() + "' can only handle jobs with only disconnected tasks.");
+ }
+ }
+ }
+
+ @Override
+ public String getStrategyName() {
+ return "Individual Task Restart";
+ }
+
+ @Override
+ public void registerMetrics(MetricGroup metricGroup) {
+ metricGroup.counter("task_failures", numTaskFailures);
+ }
+
+ // ------------------------------------------------------------------------
+ // factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Factory that instantiates the RestartAllStrategy.
+ */
+ public static class Factory implements FailoverStrategy.Factory {
+
+ @Override
+ public RestartIndividualStrategy create(ExecutionGraph executionGraph) {
+ return new RestartIndividualStrategy(executionGraph);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java
new file mode 100644
index 0000000..05a6414
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge which returns the number of full restarts.
+ */
+public class NumberOfFullRestartsGauge implements Gauge<Long> {
+
+ public static final String METRIC_NAME = "fullRestarts";
+
+ // ------------------------------------------------------------------------
+
+ private final ExecutionGraph eg;
+
+ public NumberOfFullRestartsGauge(ExecutionGraph executionGraph) {
+ this.eg = checkNotNull(executionGraph);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public Long getValue() {
+ return eg.getNumberOfFullRestarts();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 99dbc86..e60ff77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -376,7 +376,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
executionGraph.scheduleForExecution();
}
catch (Throwable t) {
- executionGraph.fail(t);
+ executionGraph.failGlobal(t);
}
}
});
[4/5] flink git commit: [FLINK-6340] [flip-1] Add a termination
future to the Execution
Posted by se...@apache.org.
[FLINK-6340] [flip-1] Add a termination future to the Execution
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0061272
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0061272
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0061272
Branch: refs/heads/master
Commit: e00612726991a05058168e5a4fbfb53853e645a5
Parents: aadfe45
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 29 22:49:54 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 3 19:17:23 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 26 ++-
.../runtime/executiongraph/ExecutionGraph.java | 182 ++++++++++------
.../executiongraph/ExecutionJobVertex.java | 116 +++-------
.../runtime/executiongraph/ExecutionVertex.java | 56 +++--
.../ExecutionGraphRestartTest.java | 78 ++++---
.../executiongraph/ExecutionGraphTestUtils.java | 8 +-
.../ExecutionStateProgressTest.java | 94 --------
.../TerminalStateDeadlockTest.java | 216 -------------------
8 files changed, 250 insertions(+), 526 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 729e161..2680849 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -121,6 +121,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
+ /** A future that completes once the Execution reaches a terminal ExecutionState */
+ private final FlinkCompletableFuture<ExecutionState> terminationFuture;
+
private volatile ExecutionState state = CREATED;
private volatile SimpleSlot assignedResource; // once assigned, never changes until the execution is archived
@@ -161,6 +164,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
markTimestamp(ExecutionState.CREATED, startTimestamp);
this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
+ this.terminationFuture = new FlinkCompletableFuture<>();
}
// --------------------------------------------------------------------------------------------
@@ -234,6 +238,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
this.taskState = checkpointStateHandles;
}
+ /**
+ * Gets a future that completes once the task execution reaches a terminal state.
+ * The future will be completed with specific state that the execution reached.
+ *
+ * @return A future for the execution's termination
+ */
+ public Future<ExecutionState> getTerminationFuture() {
+ return terminationFuture;
+ }
+
// --------------------------------------------------------------------------------------------
// Actions
// --------------------------------------------------------------------------------------------
@@ -473,7 +487,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
}
finally {
- vertex.executionCanceled();
+ vertex.executionCanceled(this);
+ terminationFuture.complete(CANCELED);
}
return;
}
@@ -741,7 +756,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
- vertex.executionFinished();
+ vertex.executionFinished(this);
+ terminationFuture.complete(FINISHED);
}
return;
}
@@ -793,7 +809,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
- vertex.executionCanceled();
+ vertex.executionCanceled(this);
+ terminationFuture.complete(CANCELED);
}
return;
}
@@ -886,7 +903,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
- vertex.executionFailed(t);
+ vertex.executionFailed(this, t);
+ terminationFuture.complete(FAILED);
}
if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 23ed99d..fff1ea2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -89,6 +90,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -188,6 +190,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Registered KvState instances reported by the TaskManagers. */
private final KvStateLocationRegistry kvStateLocationRegistry;
+ private int numVerticesTotal;
+
// ------ Configuration of the Execution -------
/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -203,6 +207,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
+ private final AtomicInteger verticesFinished;
+
/** Current status of the job execution */
private volatile JobStatus state = JobStatus.CREATED;
@@ -210,9 +216,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* that was not recoverable and triggered job failure */
private volatile Throwable failureCause;
- /** The number of job vertices that have reached a terminal state */
- private volatile int numFinishedJobVertices;
-
// ------ Fields that are relevant to the execution and need to be cleared before archiving -------
/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
@@ -317,6 +320,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
this.restartStrategy = restartStrategy;
this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
+
+ this.verticesFinished = new AtomicInteger();
}
// --------------------------------------------------------------------------------------------
@@ -454,7 +459,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return jv.getTaskVertices();
}
else {
- ArrayList<ExecutionVertex> all = new ArrayList<ExecutionVertex>();
+ ArrayList<ExecutionVertex> all = new ArrayList<>();
for (ExecutionJobVertex jv : jobVertices) {
if (jv.getGraph() != this) {
throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
@@ -586,6 +591,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
};
}
+ public int getTotalNumberOfVertices() {
+ return numVerticesTotal;
+ }
+
public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
return Collections.unmodifiableMap(this.intermediateResults);
}
@@ -620,7 +629,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
*/
public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
- Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>();
+ Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
for (ExecutionVertex vertex : getAllExecutionVertices()) {
Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
@@ -657,7 +666,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
- Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>();
+ Map<String, SerializedValue<Object>> result = new HashMap<>();
for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
}
@@ -713,6 +722,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
this.verticesInCreationOrder.add(ejv);
+ this.numVerticesTotal += ejv.getParallelism();
}
}
@@ -878,9 +888,23 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
if (transitionState(current, JobStatus.CANCELLING)) {
+
+ final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+
+ // cancel all tasks (that still need cancelling)
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
- ejv.cancel();
+ futures.add(ejv.cancelWithFuture());
}
+
+ // we build a future that is complete once all vertices have reached a terminal state
+ final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+ allTerminal.thenAccept(new AcceptFunction<Void>() {
+ @Override
+ public void accept(Void value) {
+ allVerticesInTerminalState();
+ }
+ });
+
return;
}
}
@@ -968,26 +992,33 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
- } else if (current == JobStatus.RESTARTING) {
+ }
+ else if (current == JobStatus.RESTARTING) {
this.failureCause = t;
if (tryRestartOrFail()) {
return;
}
- // concurrent job status change, let's check again
- } else if (transitionState(current, JobStatus.FAILING, t)) {
+ }
+ else if (transitionState(current, JobStatus.FAILING, t)) {
this.failureCause = t;
- if (!verticesInCreationOrder.isEmpty()) {
- // cancel all. what is failed will not cancel but stay failed
- for (ExecutionJobVertex ejv : verticesInCreationOrder) {
- ejv.cancel();
- }
- } else {
- // set the state of the job to failed
- transitionState(JobStatus.FAILING, JobStatus.FAILED, t);
+ // we build a future that is complete once all vertices have reached a terminal state
+ final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+
+ // cancel all tasks (that still need cancelling)
+ for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+ futures.add(ejv.cancelWithFuture());
}
+ final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+ allTerminal.thenAccept(new AcceptFunction<Void>() {
+ @Override
+ public void accept(Void value) {
+ allVerticesInTerminalState();
+ }
+ });
+
return;
}
@@ -1039,7 +1070,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
stateTimestamps[i] = 0;
}
}
- numFinishedJobVertices = 0;
+
transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
// if we have checkpointed state, reload it into the executions
@@ -1097,9 +1128,24 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* For testing: This waits until the job execution has finished.
*/
public void waitUntilFinished() throws InterruptedException {
- synchronized (progressLock) {
- while (!state.isTerminalState()) {
- progressLock.wait();
+ // we may need multiple attempts in the presence of failures / recovery
+ while (true) {
+ for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+ for (ExecutionVertex ev : ejv.getTaskVertices()) {
+ try {
+ ev.getCurrentExecutionAttempt().getTerminationFuture().get();
+ }
+ catch (ExecutionException e) {
+ // this should never happen
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ // now that all vertices have been (at some point) in a terminal state,
+ // we need to check if the job as a whole has entered a final state
+ if (state.isTerminalState()) {
+ return;
}
}
}
@@ -1129,59 +1175,57 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
- void jobVertexInFinalState() {
- synchronized (progressLock) {
- if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
- throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
- }
-
- numFinishedJobVertices++;
+ void vertexFinished() {
+ int numFinished = verticesFinished.incrementAndGet();
+ if (numFinished == numVerticesTotal) {
+ // done :-)
+ allVerticesInTerminalState();
+ }
+ }
- if (numFinishedJobVertices == verticesInCreationOrder.size()) {
+ void vertexUnFinished() {
+ verticesFinished.getAndDecrement();
+ }
- // we are done, transition to the final state
- JobStatus current;
- while (true) {
- current = this.state;
+ private void allVerticesInTerminalState() {
+ // we are done, transition to the final state
+ JobStatus current;
+ while (true) {
+ current = this.state;
- if (current == JobStatus.RUNNING) {
- if (transitionState(current, JobStatus.FINISHED)) {
- postRunCleanup();
- break;
- }
- }
- else if (current == JobStatus.CANCELLING) {
- if (transitionState(current, JobStatus.CANCELED)) {
- postRunCleanup();
- break;
- }
- }
- else if (current == JobStatus.FAILING) {
- if (tryRestartOrFail()) {
- break;
- }
- // concurrent job status change, let's check again
- }
- else if (current == JobStatus.SUSPENDED) {
- // we've already cleaned up when entering the SUSPENDED state
- break;
- }
- else if (current.isGloballyTerminalState()) {
- LOG.warn("Job has entered globally terminal state without waiting for all " +
- "job vertices to reach final state.");
- break;
- }
- else {
- fail(new Exception("ExecutionGraph went into final state from state " + current));
- break;
- }
+ if (current == JobStatus.RUNNING) {
+ if (transitionState(current, JobStatus.FINISHED)) {
+ postRunCleanup();
+ break;
}
- // done transitioning the state
-
- // also, notify waiters
- progressLock.notifyAll();
+ }
+ else if (current == JobStatus.CANCELLING) {
+ if (transitionState(current, JobStatus.CANCELED)) {
+ postRunCleanup();
+ break;
+ }
+ }
+ else if (current == JobStatus.FAILING) {
+ if (tryRestartOrFail()) {
+ break;
+ }
+ // concurrent job status change, let's check again
+ }
+ else if (current == JobStatus.SUSPENDED) {
+ // we've already cleaned up when entering the SUSPENDED state
+ break;
+ }
+ else if (current.isGloballyTerminalState()) {
+ LOG.warn("Job has entered globally terminal state without waiting for all " +
+ "job vertices to reach final state.");
+ break;
+ }
+ else {
+ fail(new Exception("ExecutionGraph went into final state from state " + current));
+ break;
}
}
+ // done transitioning the state
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 2e5de64..3197e65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -65,9 +66,9 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
public static final int VALUE_NOT_SET = -1;
private final Object stateMonitor = new Object();
-
+
private final ExecutionGraph graph;
-
+
private final JobVertex jobVertex;
/**
@@ -91,12 +92,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
private final ExecutionVertex[] taskVertices;
private final IntermediateResult[] producedDataSets;
-
+
private final List<IntermediateResult> inputs;
-
- private final int parallelism;
- private final boolean[] finishedSubtasks;
+ private final int parallelism;
private final SlotSharingGroup slotSharingGroup;
@@ -108,8 +107,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
private int maxParallelism;
- private volatile int numSubtasksInFinalState;
-
/**
* Serialized task information which is for all sub tasks the same. Thus, it avoids to
* serialize the same information multiple times in order to create the
@@ -231,8 +228,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
catch (Throwable t) {
throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
}
-
- finishedSubtasks = new boolean[parallelism];
}
/**
@@ -360,10 +355,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return serializedTaskInformation;
}
- public boolean isInFinalState() {
- return numSubtasksInFinalState == parallelism;
- }
-
@Override
public ExecutionState getAggregateState() {
int[] num = new int[ExecutionState.values().length];
@@ -484,51 +475,51 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return slots;
}
+ /**
+ * Cancels all currently running vertex executions.
+ */
public void cancel() {
for (ExecutionVertex ev : getTaskVertices()) {
ev.cancel();
}
}
-
- public void fail(Throwable t) {
+
+ /**
+ * Cancels all currently running vertex executions.
+ *
+ * @return A future that is complete once all tasks have canceled.
+ */
+ public Future<Void> cancelWithFuture() {
+ // we collect all futures from the task cancellations
+ ArrayList<Future<?>> futures = new ArrayList<>(parallelism);
+
+ // cancel each vertex
for (ExecutionVertex ev : getTaskVertices()) {
- ev.fail(t);
+ futures.add(ev.cancel());
}
+
+ // return a conjunct future, which is complete once all individual tasks are canceled
+ return FutureUtils.combineAll(futures);
}
-
- public void waitForAllVerticesToReachFinishingState() throws InterruptedException {
- synchronized (stateMonitor) {
- while (numSubtasksInFinalState < parallelism) {
- stateMonitor.wait();
- }
+
+ public void fail(Throwable t) {
+ for (ExecutionVertex ev : getTaskVertices()) {
+ ev.fail(t);
}
}
-
+
public void resetForNewExecution() {
- if (!(numSubtasksInFinalState == 0 || numSubtasksInFinalState == parallelism)) {
- throw new IllegalStateException("Cannot reset vertex that is not in final state");
- }
-
+
synchronized (stateMonitor) {
// check and reset the sharing groups with scheduler hints
if (slotSharingGroup != null) {
slotSharingGroup.clearTaskAssignment();
}
-
- // reset vertices one by one. if one reset fails, the "vertices in final state"
- // fields will be consistent to handle triggered cancel calls
+
for (int i = 0; i < parallelism; i++) {
taskVertices[i].resetForNewExecution();
- if (finishedSubtasks[i]) {
- finishedSubtasks[i] = false;
- numSubtasksInFinalState--;
- }
- }
-
- if (numSubtasksInFinalState != 0) {
- throw new RuntimeException("Bug: resetting the execution job vertex failed.");
}
-
+
// set up the input splits again
try {
if (this.inputSplits != null) {
@@ -548,51 +539,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
}
}
-
- //---------------------------------------------------------------------------------------------
- // Notifications
- //---------------------------------------------------------------------------------------------
-
- void vertexFinished(int subtask) {
- subtaskInFinalState(subtask);
- }
-
- void vertexCancelled(int subtask) {
- subtaskInFinalState(subtask);
- }
-
- void vertexFailed(int subtask, Throwable error) {
- subtaskInFinalState(subtask);
- }
-
- private void subtaskInFinalState(int subtask) {
- synchronized (stateMonitor) {
- if (!finishedSubtasks[subtask]) {
- finishedSubtasks[subtask] = true;
-
- if (numSubtasksInFinalState+1 == parallelism) {
-
- // call finalizeOnMaster hook
- try {
- getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
- }
- catch (Throwable t) {
- getGraph().fail(t);
- }
-
- numSubtasksInFinalState++;
-
- // we are in our final state
- stateMonitor.notifyAll();
-
- // tell the graph
- graph.jobVertexInFinalState();
- } else {
- numSubtasksInFinalState++;
- }
- }
- }
- }
// --------------------------------------------------------------------------------------------
// Accumulators / Metrics
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3f6ce88..bcf7a7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -60,8 +61,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
/**
@@ -509,30 +508,41 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// Actions
// --------------------------------------------------------------------------------------------
- public void resetForNewExecution() {
+ public Execution resetForNewExecution() {
LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
synchronized (priorExecutions) {
- Execution execution = currentExecution;
- ExecutionState state = execution.getState();
+ final Execution oldExecution = currentExecution;
+ final ExecutionState oldState = oldExecution.getState();
- if (state == FINISHED || state == CANCELED || state == FAILED) {
- priorExecutions.add(execution);
- currentExecution = new Execution(
+ if (oldState.isTerminal()) {
+ priorExecutions.add(oldExecution);
+
+ final Execution newExecution = new Execution(
getExecutionGraph().getFutureExecutor(),
this,
- execution.getAttemptNumber()+1,
+ oldExecution.getAttemptNumber()+1,
System.currentTimeMillis(),
timeout);
+ this.currentExecution = newExecution;
+
CoLocationGroup grp = jobVertex.getCoLocationGroup();
if (grp != null) {
this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
}
+
+ // if the execution was 'FINISHED' before, tell the ExecutionGraph that
+ // we take one step back on the road to reaching global FINISHED
+ if (oldState == FINISHED) {
+ getExecutionGraph().vertexUnFinished();
+ }
+
+ return newExecution;
}
else {
- throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
+ throw new IllegalStateException("Cannot reset a vertex that is in non-terminal state " + oldState);
}
}
}
@@ -545,8 +555,16 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
this.currentExecution.deployToSlot(slot);
}
- public void cancel() {
- this.currentExecution.cancel();
+ /**
+ *
+ * @return A future that completes once the execution has reached its final state.
+ */
+ public Future<ExecutionState> cancel() {
+ // to avoid any case of mixup in the presence of concurrent calls,
+ // we copy a reference to the stack to make sure both calls go to the same Execution
+ final Execution exec = this.currentExecution;
+ exec.cancel();
+ return exec.getTerminationFuture();
}
public void stop() {
@@ -621,16 +639,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// Notifications from the Execution Attempt
// --------------------------------------------------------------------------------------------
- void executionFinished() {
- jobVertex.vertexFinished(subTaskIndex);
+ void executionFinished(Execution execution) {
+ if (execution == currentExecution) {
+ getExecutionGraph().vertexFinished();
+ }
}
- void executionCanceled() {
- jobVertex.vertexCancelled(subTaskIndex);
+ void executionCanceled(Execution execution) {
+ // nothing to do
}
- void executionFailed(Throwable t) {
- jobVertex.vertexFailed(subTaskIndex, t);
+ void executionFailed(Execution execution, Throwable cause) {
+ // nothing to do
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1729582..1ebfcac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -60,12 +60,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doNothing;
+
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -288,48 +288,58 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Test
public void testCancelWhileFailing() throws Exception {
- // We want to manually control the restart and delay
- RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
- Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createSpyExecutionGraph(restartStrategy);
- ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
- Instance instance = executionGraphInstanceTuple.f1;
- doNothing().when(executionGraph).jobVertexInFinalState();
+ final RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
+ final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
- // Kill the instance...
- instance.markDead();
+ assertEquals(JobStatus.RUNNING, graph.getState());
- Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+ // switch all tasks to running
+ for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
- // ...and wait for all vertices to be in state FAILED. The
- // jobVertexInFinalState does nothing, that's why we don't wait on the
- // job status.
- boolean success = false;
- while (deadline.hasTimeLeft() && !success) {
- success = true;
- for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
- ExecutionState state = vertex.getExecutionState();
- if (state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
- success = false;
- Thread.sleep(100);
- break;
- }
- }
+ graph.fail(new Exception("test"));
+
+ assertEquals(JobStatus.FAILING, graph.getState());
+
+ graph.cancel();
+
+ assertEquals(JobStatus.CANCELLING, graph.getState());
+
+ // let all tasks finish cancelling
+ for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ vertex.getCurrentExecutionAttempt().cancelingComplete();
}
- // Still in failing
- assertEquals(JobStatus.FAILING, executionGraph.getState());
+ assertEquals(JobStatus.CANCELED, graph.getState());
+ }
- // The cancel call needs to change the state to CANCELLING
- executionGraph.cancel();
+ @Test
+ public void testFailWhileCanceling() throws Exception {
+ final RestartStrategy restartStrategy = new NoRestartStrategy();
+ final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
- assertEquals(JobStatus.CANCELLING, executionGraph.getState());
+ assertEquals(JobStatus.RUNNING, graph.getState());
- // Unspy and finalize the job state
- doCallRealMethod().when(executionGraph).jobVertexInFinalState();
+ // switch all tasks to running
+ for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
- executionGraph.jobVertexInFinalState();
+ graph.cancel();
- assertEquals(JobStatus.CANCELED, executionGraph.getState());
+ assertEquals(JobStatus.CANCELLING, graph.getState());
+
+ graph.fail(new Exception("test"));
+
+ assertEquals(JobStatus.FAILING, graph.getState());
+
+ // let all tasks finish cancelling
+ for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ vertex.getCurrentExecutionAttempt().cancelingComplete();
+ }
+
+ assertEquals(JobStatus.FAILED, graph.getState());
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index b0137fa..73838dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.executiongraph;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -52,9 +51,10 @@ import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPart
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
-import org.mockito.Matchers;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
@@ -198,10 +198,6 @@ public class ExecutionGraphTestUtils {
}
};
- doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt());
- doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), Matchers.any(Throwable.class));
- doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt());
-
return ejv;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
deleted file mode 100644
index bd51c81..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.util.Collections;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-public class ExecutionStateProgressTest {
-
- @Test
- public void testAccumulatedStateFinished() {
- try {
- final JobID jid = new JobID();
- final JobVertexID vid = new JobVertexID();
-
- JobVertex ajv = new JobVertex("TestVertex", vid);
- ajv.setParallelism(3);
- ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
- ExecutionGraph graph = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jid,
- "test job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
- graph.attachJobGraph(Collections.singletonList(ajv));
-
- setGraphStatus(graph, JobStatus.RUNNING);
-
- ExecutionJobVertex ejv = graph.getJobVertex(vid);
-
- // mock resources and mock taskmanager
- for (ExecutionVertex ee : ejv.getTaskVertices()) {
- SimpleSlot slot = getInstance(
- new ActorTaskManagerGateway(
- new SimpleActorGateway(
- TestingUtils.defaultExecutionContext()))
- ).allocateSimpleSlot(jid);
- ee.deployToSlot(slot);
- }
-
- // finish all
- for (ExecutionVertex ee : ejv.getTaskVertices()) {
- ee.executionFinished();
- }
-
- assertTrue(ejv.isInFinalState());
- assertEquals(JobStatus.FINISHED, graph.getState());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
deleted file mode 100644
index d717986..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.*;
-
-public class TerminalStateDeadlockTest {
-
- private final Field stateField;
- private final Field resourceField;
- private final Field execGraphStateField;
- private final Field execGraphSlotProviderField;
-
- private final SimpleSlot resource;
-
-
- public TerminalStateDeadlockTest() {
- try {
- // the reflection fields to access the private fields
- this.stateField = Execution.class.getDeclaredField("state");
- this.stateField.setAccessible(true);
-
- this.resourceField = Execution.class.getDeclaredField("assignedResource");
- this.resourceField.setAccessible(true);
-
- this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state");
- this.execGraphStateField.setAccessible(true);
-
- this.execGraphSlotProviderField = ExecutionGraph.class.getDeclaredField("slotProvider");
- this.execGraphSlotProviderField.setAccessible(true);
-
- // the dummy resource
- ResourceID resourceId = ResourceID.generate();
- InetAddress address = InetAddress.getByName("127.0.0.1");
- TaskManagerLocation ci = new TaskManagerLocation(resourceId, address, 12345);
-
- HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
- Instance instance = new Instance(
- new ActorTaskManagerGateway(DummyActorGateway.INSTANCE), ci, new InstanceID(), resources, 4);
-
- this.resource = instance.allocateSimpleSlot(new JobID());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
-
- // silence the compiler
- throw new RuntimeException();
- }
- }
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testProvokeDeadlock() {
- try {
- final JobID jobId = resource.getJobID();
- final JobVertexID vid1 = new JobVertexID();
- final JobVertexID vid2 = new JobVertexID();
-
- final List<JobVertex> vertices;
- {
- JobVertex v1 = new JobVertex("v1", vid1);
- JobVertex v2 = new JobVertex("v2", vid2);
- v1.setParallelism(1);
- v2.setParallelism(1);
- v1.setInvokableClass(DummyInvokable.class);
- v2.setInvokableClass(DummyInvokable.class);
- vertices = Arrays.asList(v1, v2);
- }
-
- final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
- final Executor executor = Executors.newFixedThreadPool(4);
-
- // try a lot!
- for (int i = 0; i < 20000; i++) {
- final TestExecGraph eg = new TestExecGraph(jobId);
- eg.attachJobGraph(vertices);
-
- final Execution e1 = eg.getJobVertex(vid1).getTaskVertices()[0].getCurrentExecutionAttempt();
- final Execution e2 = eg.getJobVertex(vid2).getTaskVertices()[0].getCurrentExecutionAttempt();
-
- initializeExecution(e1);
- initializeExecution(e2);
-
- execGraphStateField.set(eg, JobStatus.FAILING);
- execGraphSlotProviderField.set(eg, scheduler);
-
- Runnable r1 = new Runnable() {
- @Override
- public void run() {
- e1.cancelingComplete();
- }
- };
- Runnable r2 = new Runnable() {
- @Override
- public void run() {
- e2.cancelingComplete();
- }
- };
-
- executor.execute(r1);
- executor.execute(r2);
-
- eg.waitTillDone();
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private void initializeExecution(Execution exec) throws IllegalAccessException {
- // set state to canceling
- stateField.set(exec, ExecutionState.CANCELING);
-
- // assign a resource
- resourceField.set(exec, resource);
- }
-
-
- static class TestExecGraph extends ExecutionGraph {
-
- private static final Configuration EMPTY_CONFIG = new Configuration();
-
- private static final Time TIMEOUT = Time.seconds(30L);
-
- private volatile boolean done;
-
- TestExecGraph(JobID jobId) throws IOException {
- super(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jobId,
- "test graph",
- EMPTY_CONFIG,
- new SerializedValue<>(new ExecutionConfig()),
- TIMEOUT,
- new FixedDelayRestartStrategy(1, 0),
- new Scheduler(TestingUtils.defaultExecutionContext()));
- }
-
- @Override
- public void scheduleForExecution() {
- // notify that we are done with the "restarting"
- synchronized (this) {
- done = true;
- this.notifyAll();
- }
- }
-
- public void waitTillDone() {
- try {
- synchronized (this) {
- while (!done) {
- this.wait();
- }
- }
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-}
[2/5] flink git commit: [FLINK-5869] [flip-1] Add basic abstraction
for Failover Strategies to ExecutionGraph
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b01ddc0..479ec51 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1345,7 +1345,7 @@ class JobManager(
currentJobs.remove(jobId)
if (executionGraph != null) {
- executionGraph.fail(t)
+ executionGraph.failGlobal(t)
}
val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {
@@ -1426,7 +1426,7 @@ class JobManager(
}
} catch {
case t: Throwable => try {
- executionGraph.fail(t)
+ executionGraph.failGlobal(t)
} catch {
case tt: Throwable =>
log.error("Error while marking ExecutionGraph as failed.", tt)
@@ -1837,7 +1837,7 @@ class JobManager(
job =>
future {
// Fail the execution graph
- job._1.fail(new IllegalStateException("Another JobManager removed the job from " +
+ job._1.failGlobal(new IllegalStateException("Another JobManager removed the job from " +
"ZooKeeper."))
}(context.dispatcher)
)
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 41b0e35..9250634 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -3135,6 +3135,7 @@ public class CheckpointCoordinatorTest {
vertex,
1,
1L,
+ 1L,
Time.milliseconds(500L)
));
when(exec.getAttemptId()).thenReturn(attemptID);
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 2e0bd76..99a3815 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -90,7 +90,7 @@ public class CoordinatorShutdownTest {
FailingBlockingInvokable.unblock();
- graph.waitUntilFinished();
+ graph.waitUntilTerminal();
// verify that the coordinator was shut down
CheckpointCoordinator coord = graph.getCheckpointCoordinator();
@@ -149,7 +149,7 @@ public class CoordinatorShutdownTest {
BlockingInvokable.unblock();
- graph.waitUntilFinished();
+ graph.waitUntilTerminal();
// verify that the coordinator was shut down
CheckpointCoordinator coord = graph.getCheckpointCoordinator();
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 1f038bd..a524e5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -56,7 +57,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
CompletedCheckpointStore store = mock(CompletedCheckpointStore.class);
ExecutionGraph graph = createExecutionGraphAndEnableCheckpointing(counter, store);
- graph.fail(new Exception("Test Exception"));
+ graph.failGlobal(new Exception("Test Exception"));
verify(counter, times(1)).shutdown(JobStatus.FAILED);
verify(store, times(1)).shutdown(eq(JobStatus.FAILED));
@@ -91,6 +92,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
new SerializedValue<>(new ExecutionConfig()),
Time.days(1L),
new NoRestartStrategy(),
+ new RestartAllStrategy.Factory(),
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
new Scheduler(TestingUtils.defaultExecutionContext()),
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 866f55c..a739e14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -24,11 +24,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotEquals;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -342,50 +340,6 @@ public class ExecutionGraphDeploymentTest {
}
@Test
- public void testRegistrationOfExecutionsFailingFinalize() {
- try {
-
- final JobVertexID jid1 = new JobVertexID();
- final JobVertexID jid2 = new JobVertexID();
-
- JobVertex v1 = new FailingFinalizeJobVertex("v1", jid1);
- JobVertex v2 = new JobVertex("v2", jid2);
-
- Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 6, v2, 4).f1;
-
- List<Execution> execList = new ArrayList<Execution>();
- execList.addAll(executions.values());
- // sort executions by job vertex. Failing job vertex first
- Collections.sort(execList, new Comparator<Execution>() {
- @Override
- public int compare(Execution o1, Execution o2) {
- return o1.getVertex().getTaskNameWithSubtaskIndex().compareTo(o2.getVertex().getTaskNameWithSubtaskIndex());
- }
- });
-
- int cnt = 0;
- for (Execution e : execList) {
- cnt++;
- e.markFinished();
- if (cnt <= 6) {
- // the last execution of the first job vertex triggers the failing finalize hook
- assertEquals(ExecutionState.FINISHED, e.getState());
- }
- else {
- // all following executions should be canceled
- assertEquals(ExecutionState.CANCELED, e.getState());
- }
- }
-
- assertEquals(0, executions.size());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
/**
* Tests that a blocking batch job fails if there are not enough resources left to schedule the
* succeeding tasks. This test case is related to [FLINK-4296] where finished producing tasks
@@ -473,24 +427,6 @@ public class ExecutionGraphDeploymentTest {
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
}
- @Test
- public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
-
- final int negativeMaxNumberOfCheckpointsToRetain = -10;
-
- final Configuration jobManagerConfig = new Configuration();
- jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
- negativeMaxNumberOfCheckpointsToRetain);
-
- final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
-
- assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
- eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-
- assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
- eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
- }
-
private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
final JobID jobId = new JobID();
@@ -503,24 +439,24 @@ public class ExecutionGraphDeploymentTest {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
for (int i = 0; i < dop1 + dop2; i++) {
scheduler.newInstanceAvailable(
- ExecutionGraphTestUtils.getInstance(
- new ActorTaskManagerGateway(
- new ExecutionGraphTestUtils.SimpleActorGateway(
- TestingUtils.directExecutionContext()))));
+ ExecutionGraphTestUtils.getInstance(
+ new ActorTaskManagerGateway(
+ new ExecutionGraphTestUtils.SimpleActorGateway(
+ TestingUtils.directExecutionContext()))));
}
// execution graph that executes actions synchronously
ExecutionGraph eg = new ExecutionGraph(
- new DirectScheduledExecutorService(),
- TestingUtils.defaultExecutor(),
- jobId,
- "some job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- scheduler);
-
+ new DirectScheduledExecutorService(),
+ TestingUtils.defaultExecutor(),
+ jobId,
+ "some job",
+ new Configuration(),
+ new SerializedValue<>(new ExecutionConfig()),
+ AkkaUtils.getDefaultTimeout(),
+ new NoRestartStrategy(),
+ scheduler);
+
eg.setQueuedSchedulingAllowed(false);
List<JobVertex> ordered = Arrays.asList(v1, v2);
@@ -537,13 +473,27 @@ public class ExecutionGraphDeploymentTest {
return new Tuple2<>(eg, executions);
}
+ @Test
+ public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
+
+ final int negativeMaxNumberOfCheckpointsToRetain = -10;
+
+ final Configuration jobManagerConfig = new Configuration();
+ jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+ negativeMaxNumberOfCheckpointsToRetain);
+
+ final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+ assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+
+ assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+ }
+
@SuppressWarnings("serial")
public static class FailingFinalizeJobVertex extends JobVertex {
- public FailingFinalizeJobVertex(String name) {
- super(name);
- }
-
public FailingFinalizeJobVertex(String name, JobVertexID id) {
super(name, id);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 97127c7..6b5ceae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
@@ -52,9 +51,7 @@ import org.junit.Test;
import org.mockito.Matchers;
import java.io.IOException;
-import java.net.URL;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -133,10 +130,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
new SerializedValue<ExecutionConfig>(null),
timeout,
testingRestartStrategy,
- Collections.<BlobKey>emptyList(),
- Collections.<URL>emptyList(),
- scheduler,
- getClass().getClassLoader());
+ scheduler);
RestartTimeGauge restartingTime = new RestartTimeGauge(executionGraph);
@@ -242,8 +236,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
// now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
// for this to work, we have to use a SuppressRestartException
- executionGraph.fail(new SuppressRestartsException(new Exception()));
-
+ executionGraph.failGlobal(new SuppressRestartsException(new Exception()));
+
assertEquals(JobStatus.FAILED, executionGraph.getState());
previousRestartingTime = restartingTime.getValue();
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1ebfcac..eeb6c69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -66,9 +66,8 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
+
public class ExecutionGraphRestartTest extends TestLogger {
@@ -271,12 +270,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.RESTARTING, executionGraph.getState());
// The restarting should not fail with an ordinary exception
- executionGraph.fail(new Exception("Test exception"));
+ executionGraph.failGlobal(new Exception("Test exception"));
assertEquals(JobStatus.RESTARTING, executionGraph.getState());
// but it should fail when sending a SuppressRestartsException
- executionGraph.fail(new SuppressRestartsException(new Exception("Test exception")));
+ executionGraph.failGlobal(new SuppressRestartsException(new Exception("Test exception")));
assertEquals(JobStatus.FAILED, executionGraph.getState());
@@ -298,7 +297,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
vertex.getCurrentExecutionAttempt().switchToRunning();
}
- graph.fail(new Exception("test"));
+ graph.failGlobal(new Exception("test"));
assertEquals(JobStatus.FAILING, graph.getState());
@@ -330,7 +329,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.CANCELLING, graph.getState());
- graph.fail(new Exception("test"));
+ graph.failGlobal(new Exception("test"));
assertEquals(JobStatus.FAILING, graph.getState());
@@ -344,8 +343,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Test
public void testNoRestartOnSuppressException() throws Exception {
- Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createSpyExecutionGraph(new FixedDelayRestartStrategy(1, 1000));
- ExecutionGraph eg = executionGraphInstanceTuple.f0;
+ final ExecutionGraph eg = createExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0)).f0;
// Fail with unrecoverable Exception
eg.getAllExecutionVertices().iterator().next().fail(
@@ -357,21 +355,10 @@ public class ExecutionGraphRestartTest extends TestLogger {
vertex.getCurrentExecutionAttempt().cancelingComplete();
}
- FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
-
- // Wait for async restart
- Deadline deadline = timeout.fromNow();
- while (deadline.hasTimeLeft() && eg.getState() != JobStatus.FAILED) {
- Thread.sleep(100);
- }
-
+ eg.waitUntilTerminal();
assertEquals(JobStatus.FAILED, eg.getState());
- // No restart
- verify(eg, never()).restart();
-
RestartStrategy restartStrategy = eg.getRestartStrategy();
-
assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
assertEquals(0, ((FixedDelayRestartStrategy) restartStrategy).getCurrentRestartAttempt());
@@ -443,7 +430,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
/**
* Tests that a graph is not restarted after cancellation via a call to
- * {@link ExecutionGraph#fail(Throwable)}. This can happen when a slot is
+ * {@link ExecutionGraph#failGlobal(Throwable)}. This can happen when a slot is
* released concurrently with cancellation.
*/
@Test
@@ -490,7 +477,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
/**
* Tests that it is possible to fail a graph via a call to
- * {@link ExecutionGraph#fail(Throwable)} after cancellation.
+ * {@link ExecutionGraph#failGlobal(Throwable)} after cancellation.
*/
@Test
public void testFailExecutionGraphAfterCancel() throws Exception {
@@ -523,7 +510,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
eg.cancel();
assertEquals(JobStatus.CANCELLING, eg.getState());
- eg.fail(new Exception("Test Exception"));
+ eg.failGlobal(new Exception("Test Exception"));
assertEquals(JobStatus.FAILING, eg.getState());
Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index c8a0422..1eecd4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -61,6 +61,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -300,8 +301,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
slotProvider.addSlots(targetVertex.getID(), targetFutures);
final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
- TerminalJobStatusListener testListener = new TerminalJobStatusListener();
- eg.registerJobStatusListener(testListener);
//
// we complete some of the futures
@@ -322,7 +321,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
sourceFutures[1].completeExceptionally(new TestRuntimeException());
// wait until the job failed as a whole
- testListener.waitForTerminalState(2000);
+ eg.getTerminationFuture().get(2000, TimeUnit.MILLISECONDS);
// wait until all slots are back
verify(slotOwner, new Timeout(2000, times(6))).returnAllocatedSlot(any(Slot.class));
@@ -371,8 +370,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
slotProvider.addSlots(vertex.getID(), slotFutures);
final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider, Time.milliseconds(20));
- final TerminalJobStatusListener statusListener = new TerminalJobStatusListener();
- eg.registerJobStatusListener(statusListener);
// we complete one future
slotFutures[1].complete(slots[1]);
@@ -388,7 +385,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
// since future[0] is still missing the while operation must time out
// we have no restarts allowed, so the job will go terminal
- statusListener.waitForTerminalState(2000);
+ eg.getTerminationFuture().get(2000, TimeUnit.MILLISECONDS);
// wait until all slots are back
verify(slotOwner, new Timeout(2000, times(2))).returnAllocatedSlot(any(Slot.class));
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
deleted file mode 100644
index 27844c1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.executiongraph;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
-import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.api.mockito.PowerMockito;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.same;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ExecutionGraph.class)
-public class ExecutionGraphSignalsTest {
- private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
- private int[] dop = new int[] { 5, 7, 2, 11, 4 };
- private ExecutionVertex[][] mockEV = new ExecutionVertex[mockEJV.length][];
- private ExecutionGraph eg;
- private Field f;
-
- @Before
- public void prepare() throws Exception {
- final JobID jobId = new JobID();
- final String jobName = "Test Job Sample Name";
- final Configuration cfg = new Configuration();
-
-
- assert (mockEJV.length == 5);
- JobVertex v1 = new JobVertex("vertex1");
- JobVertex v2 = new JobVertex("vertex2");
- JobVertex v3 = new JobVertex("vertex3");
- JobVertex v4 = new JobVertex("vertex4");
- JobVertex v5 = new JobVertex("vertex5");
-
- for(int i = 0; i < mockEJV.length; ++i) {
- mockEJV[i] = mock(ExecutionJobVertex.class);
-
- this.mockEV[i] = new ExecutionVertex[dop[i]];
- for (int j = 0; j < dop[i]; ++j) {
- this.mockEV[i][j] = mock(ExecutionVertex.class);
- }
-
- when(mockEJV[i].getProducedDataSets()).thenReturn(new IntermediateResult[0]);
- when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
- }
-
- PowerMockito
- .whenNew(ExecutionJobVertex.class)
- .withArguments(any(ExecutionGraph.class), same(v1), any(Integer.class).intValue(),
- any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[0]);
- PowerMockito
- .whenNew(ExecutionJobVertex.class)
- .withArguments(any(ExecutionGraph.class), same(v2), any(Integer.class).intValue(),
- any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[1]);
- PowerMockito
- .whenNew(ExecutionJobVertex.class)
- .withArguments(any(ExecutionGraph.class), same(v3), any(Integer.class).intValue(),
- any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[2]);
- PowerMockito
- .whenNew(ExecutionJobVertex.class)
- .withArguments(any(ExecutionGraph.class), same(v4), any(Integer.class).intValue(),
- any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[3]);
- PowerMockito
- .whenNew(ExecutionJobVertex.class)
- .withArguments(any(ExecutionGraph.class), same(v5), any(Integer.class).intValue(),
- any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[4]);
-
- v1.setParallelism(dop[0]);
- v2.setParallelism(dop[1]);
- v3.setParallelism(dop[2]);
- v4.setParallelism(dop[3]);
- v5.setParallelism(dop[4]);
-
- v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
- mockNumberOfInputs(1,0);
- v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
- mockNumberOfInputs(3,1);
- v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
- mockNumberOfInputs(3,2);
- v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
- mockNumberOfInputs(4,3);
- v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
- mockNumberOfInputs(4,2);
-
- List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
-
- eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
- eg.attachJobGraph(ordered);
-
- f = eg.getClass().getDeclaredField("state");
- f.setAccessible(true);
- }
-
- private void mockNumberOfInputs(int nodeIndex, int predecessorIndex) {
- for(int j = 0; j < dop[nodeIndex]; ++j) {
- when(mockEV[nodeIndex][j].getNumberOfInputs()).thenReturn(dop[predecessorIndex]);
- }
- }
-
- @Test
- public void testCancel() throws Exception {
- assertEquals(JobStatus.CREATED, eg.getState());
- eg.cancel();
-
- verifyCancel(1);
-
- f.set(eg, JobStatus.RUNNING);
- eg.cancel();
-
- verifyCancel(2);
- assertEquals(JobStatus.CANCELLING, eg.getState());
-
- eg.cancel();
-
- verifyCancel(2);
- assertEquals(JobStatus.CANCELLING, eg.getState());
-
- f.set(eg, JobStatus.CANCELED);
- eg.cancel();
-
- verifyCancel(2);
- assertEquals(JobStatus.CANCELED, eg.getState());
-
- f.set(eg, JobStatus.FAILED);
- eg.cancel();
-
- verifyCancel(2);
- assertEquals(JobStatus.FAILED, eg.getState());
-
- f.set(eg, JobStatus.FAILING);
- eg.cancel();
-
- verifyCancel(2);
- assertEquals(JobStatus.CANCELLING, eg.getState());
-
- f.set(eg, JobStatus.FINISHED);
- eg.cancel();
-
- verifyCancel(2);
- assertEquals(JobStatus.FINISHED, eg.getState());
-
- f.set(eg, JobStatus.RESTARTING);
- eg.cancel();
-
- verifyCancel(2);
- assertEquals(JobStatus.CANCELED, eg.getState());
- }
-
- private void verifyCancel(int times) {
- for (int i = 0; i < mockEJV.length; ++i) {
- verify(mockEJV[i], times(times)).cancel();
- }
- }
-
- /**
- * Tests that suspend cancels the ExecutionJobVertices and transitions to SUSPENDED state.
- * Tests also that one cannot leave the SUSPENDED state to enter a terminal state.
- */
- @Test
- public void testSuspend() throws Exception {
- assertEquals(JobStatus.CREATED, eg.getState());
- Exception testException = new Exception("Test exception");
-
- eg.suspend(testException);
-
- verifyCancel(1);
- assertEquals(JobStatus.SUSPENDED, eg.getState());
-
- f.set(eg, JobStatus.RUNNING);
-
- eg.suspend(testException);
-
- verifyCancel(2);
- assertEquals(JobStatus.SUSPENDED, eg.getState());
-
- f.set(eg, JobStatus.FAILING);
-
- eg.suspend(testException);
-
- verifyCancel(3);
- assertEquals(JobStatus.SUSPENDED, eg.getState());
-
- f.set(eg, JobStatus.CANCELLING);
-
- eg.suspend(testException);
-
- verifyCancel(4);
- assertEquals(JobStatus.SUSPENDED, eg.getState());
-
- f.set(eg, JobStatus.FAILED);
-
- eg.suspend(testException);
-
- verifyCancel(4);
- assertEquals(JobStatus.FAILED, eg.getState());
-
- f.set(eg, JobStatus.FINISHED);
-
- eg.suspend(testException);
-
- verifyCancel(4);
- assertEquals(JobStatus.FINISHED, eg.getState());
-
- f.set(eg, JobStatus.CANCELED);
-
- eg.suspend(testException);
-
- verifyCancel(4);
- assertEquals(JobStatus.CANCELED, eg.getState());
-
- f.set(eg, JobStatus.SUSPENDED);
-
- eg.fail(testException);
-
- assertEquals(JobStatus.SUSPENDED, eg.getState());
-
- eg.cancel();
-
- assertEquals(JobStatus.SUSPENDED, eg.getState());
- }
-
- // test that all source tasks receive STOP signal
- // test that all non-source tasks do not receive STOP signal
- @Test
- public void testStop() throws Exception {
- Field f = eg.getClass().getDeclaredField("isStoppable");
- f.setAccessible(true);
- f.set(eg, true);
-
- eg.stop();
-
- for (int i : new int[]{0,2}) {
- for (int j = 0; j < mockEV[i].length; ++j) {
- verify(mockEV[i][j], times(1)).stop();
- }
- }
-
- for (int i : new int[]{1,3,4}) {
- for (int j = 0; j < mockEV[i].length; ++j) {
- verify(mockEV[i][j], times(0)).stop();
- }
- }
- }
-
- /**
- * Test that failing in state restarting will retrigger the restarting logic. This means that
- * it only goes into the state FAILED after the restart strategy says the job is no longer
- * restartable.
- */
- @Test
- public void testFailureWhileRestarting() throws IllegalAccessException, NoSuchFieldException, InterruptedException {
- Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
- restartStrategyField.setAccessible(true);
-
- restartStrategyField.set(eg, new InfiniteDelayRestartStrategy(1));
-
- f.set(eg, JobStatus.RESTARTING);
-
- eg.fail(new Exception("Test"));
-
- // we should restart since we have one restart attempt left
- assertEquals(JobStatus.RESTARTING, eg.getState());
-
- eg.fail(new Exception("Test"));
-
- // after depleting all our restart attempts we should go into Failed
- assertEquals(JobStatus.FAILED, eg.getState());
- }
-
- /**
- * Tests that a {@link SuppressRestartsException} in state RESTARTING stops the restarting
- * immediately and sets the execution graph's state to FAILED.
- */
- @Test
- public void testSuppressRestartFailureWhileRestarting() throws IllegalAccessException, NoSuchFieldException {
- Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
- restartStrategyField.setAccessible(true);
-
- restartStrategyField.set(eg, new InfiniteDelayRestartStrategy());
-
- f.set(eg, JobStatus.RESTARTING);
-
- // suppress a possible restart
- eg.fail(new SuppressRestartsException(new Exception("Test")));
-
- assertEquals(JobStatus.FAILED, eg.getState());
- }
-
- /**
- * Tests that we can suspend a job when in state RESTARTING.
- */
- @Test
- public void testSuspendWhileRestarting() throws IllegalAccessException, NoSuchFieldException {
- Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
- restartStrategyField.setAccessible(true);
-
- restartStrategyField.set(eg, new InfiniteDelayRestartStrategy());
-
- f.set(eg, JobStatus.RESTARTING);
-
- final Exception exception = new Exception("Suspended");
-
- eg.suspend(exception);
-
- assertEquals(JobStatus.SUSPENDED, eg.getState());
-
- assertEquals(exception, eg.getFailureCause());
- }
-
- // STOP only supported if all sources are stoppable
- @Test(expected = StoppingException.class)
- public void testStopBatching() throws StoppingException {
- eg.stop();
- }
-
- /**
- * Tests that a failing scheduleOrUpdateConsumers call with a non-existing execution attempt
- * id, will not fail the execution graph.
- */
- @Test
- public void testFailingScheduleOrUpdateConsumers() throws IllegalAccessException {
- IntermediateResultPartitionID intermediateResultPartitionId = new IntermediateResultPartitionID();
- // The execution attempt id does not exist and thus the scheduleOrUpdateConsumers call
- // should fail
- ExecutionAttemptID producerId = new ExecutionAttemptID();
- ResultPartitionID resultPartitionId = new ResultPartitionID(intermediateResultPartitionId, producerId);
-
- f.set(eg, JobStatus.RUNNING);
-
- assertEquals(JobStatus.RUNNING, eg.getState());
-
- try {
- eg.scheduleOrUpdateConsumers(resultPartitionId);
- fail("Expected ExecutionGraphException.");
- } catch (ExecutionGraphException e) {
- // we've expected this exception to occur
- }
-
- assertEquals(JobStatus.RUNNING, eg.getState());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
new file mode 100644
index 0000000..effe417
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.StoppingException;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Validates that stop() calls are handled correctly.
+ */
+public class ExecutionGraphStopTest extends TestLogger {
+
+ /**
+ * Tests that STOP is only supported if all sources are stoppable
+ */
+ @Test
+ public void testStopIfSourcesNotStoppable() throws Exception {
+ final ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph();
+
+ try {
+ graph.stop();
+ fail("exception expected");
+ }
+ catch (StoppingException e) {
+ // expected
+ }
+ }
+
+ /**
+ * Validates that stop is only sent to the sources
+ *
+ * This test build a simple job with two sources and two non-source vertices.
+ */
+ @Test
+ public void testStop() throws Exception {
+ final int sourcePar1 = 11;
+ final int sourcePar2 = 7;
+
+ final JobVertex source1 = new JobVertex("source 1");
+ source1.setInvokableClass(StoppableInvokable.class);
+ source1.setParallelism(sourcePar1);
+
+ final JobVertex source2 = new JobVertex("source 2");
+ source2.setInvokableClass(StoppableInvokable.class);
+ source2.setParallelism(sourcePar2);
+
+ final JobVertex nonSource1 = new JobVertex("non-source-1");
+ nonSource1.setInvokableClass(NoOpInvokable.class);
+ nonSource1.setParallelism(10);
+
+ final JobVertex nonSource2 = new JobVertex("non-source-2");
+ nonSource2.setInvokableClass(NoOpInvokable.class);
+ nonSource2.setParallelism(10);
+
+ nonSource1.connectNewDataSetAsInput(source1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ nonSource1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ nonSource2.connectNewDataSetAsInput(nonSource1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ final JobID jid = new JobID();
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(
+ jid, source1, source2, nonSource1, nonSource2);
+
+ // we use different gateways for sources and non-sources to make sure the right ones
+ // get the RPC calls
+ final TaskManagerGateway sourceGateway = spy(new SimpleAckingTaskManagerGateway());
+ final TaskManagerGateway nonSourceGateway = spy(new SimpleAckingTaskManagerGateway());
+
+ // deploy source 1
+ for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) {
+ SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
+ ev.getCurrentExecutionAttempt().deployToSlot(slot);
+ }
+
+ // deploy source 2
+ for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) {
+ SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
+ ev.getCurrentExecutionAttempt().deployToSlot(slot);
+ }
+
+ // deploy non-source 1
+ for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) {
+ SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
+ ev.getCurrentExecutionAttempt().deployToSlot(slot);
+ }
+
+ // deploy non-source 2
+ for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) {
+ SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
+ ev.getCurrentExecutionAttempt().deployToSlot(slot);
+ }
+
+ eg.stop();
+
+ verify(sourceGateway, timeout(1000).times(sourcePar1 + sourcePar2)).stopTask(any(ExecutionAttemptID.class), any(Time.class));
+ verify(nonSourceGateway, times(0)).stopTask(any(ExecutionAttemptID.class), any(Time.class));
+
+ ExecutionGraphTestUtils.finishAllVertices(eg);
+ }
+
+ /**
+ * Tests that the stopping RPC call is sent upon stopping requests.
+ */
+ @Test
+ public void testStopRpc() throws Exception {
+ final JobID jid = new JobID();
+ final JobVertex vertex = new JobVertex("vertex");
+ vertex.setInvokableClass(NoOpInvokable.class);
+ vertex.setParallelism(5);
+
+ final ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph(jid, vertex);
+ final Execution exec = graph.getJobVertex(vertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
+
+ final TaskManagerGateway gateway = mock(TaskManagerGateway.class);
+ when(gateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
+ .thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class)))
+ .thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+ final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
+
+ exec.deployToSlot(slot);
+ exec.switchToRunning();
+ assertEquals(ExecutionState.RUNNING, exec.getState());
+
+ exec.stop();
+ assertEquals(ExecutionState.RUNNING, exec.getState());
+
+ verify(gateway, times(1)).stopTask(any(ExecutionAttemptID.class), any(Time.class));
+
+ exec.markFinished();
+ assertEquals(ExecutionState.FINISHED, exec.getState());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
new file mode 100644
index 0000000..b3af005
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+/**
+ * Validates that suspending out of various states works correctly.
+ */
+public class ExecutionGraphSuspendTest {
+
+ /**
+ * Going into SUSPENDED out of CREATED should immediately cancel everything and
+ * not send out RPC calls.
+ */
+ @Test
+ public void testSuspendedOutOfCreated() throws Exception {
+ final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+ final int parallelism = 10;
+ final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+ assertEquals(JobStatus.CREATED, eg.getState());
+
+ // suspend
+
+ eg.suspend(new Exception("suspend"));
+
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+ validateAllVerticesInState(eg, ExecutionState.CANCELED);
+ validateCancelRpcCalls(gateway, 0);
+
+ ensureCannotLeaveSuspendedState(eg, gateway);
+ }
+
+ /**
+ * Going into SUSPENDED out of DEPLOYING vertices should cancel all vertices once with RPC calls.
+ */
+ @Test
+ public void testSuspendedOutOfDeploying() throws Exception {
+ final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+ final int parallelism = 10;
+ final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+ eg.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ validateAllVerticesInState(eg, ExecutionState.DEPLOYING);
+
+ // suspend
+
+ eg.suspend(new Exception("suspend"));
+
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+ validateAllVerticesInState(eg, ExecutionState.CANCELING);
+ validateCancelRpcCalls(gateway, parallelism);
+
+ ensureCannotLeaveSuspendedState(eg, gateway);
+ }
+
+ /**
+ * Going into SUSPENDED out of RUNNING vertices should cancel all vertices once with RPC calls.
+ */
+ @Test
+ public void testSuspendedOutOfRunning() throws Exception {
+ final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+ final int parallelism = 10;
+ final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+ eg.scheduleForExecution();
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ validateAllVerticesInState(eg, ExecutionState.RUNNING);
+
+ // suspend
+
+ eg.suspend(new Exception("suspend"));
+
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+ validateAllVerticesInState(eg, ExecutionState.CANCELING);
+ validateCancelRpcCalls(gateway, parallelism);
+
+ ensureCannotLeaveSuspendedState(eg, gateway);
+ }
+
+ /**
+ * Suspending from FAILING goes to SUSPENDED and sends no additional RPC calls
+ */
+ @Test
+ public void testSuspendedOutOfFailing() throws Exception {
+ final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+ final int parallelism = 10;
+ final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+ eg.scheduleForExecution();
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ eg.failGlobal(new Exception("fail global"));
+
+ assertEquals(JobStatus.FAILING, eg.getState());
+ validateCancelRpcCalls(gateway, parallelism);
+
+ // suspend
+ eg.suspend(new Exception("suspend"));
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+ ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+
+ ensureCannotLeaveSuspendedState(eg, gateway);
+ }
+
+ /**
+ * Suspending from FAILED should do nothing.
+ */
+ @Test
+ public void testSuspendedOutOfFailed() throws Exception {
+ final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+ final int parallelism = 10;
+ final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+ eg.scheduleForExecution();
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ eg.failGlobal(new Exception("fail global"));
+
+ assertEquals(JobStatus.FAILING, eg.getState());
+ validateCancelRpcCalls(gateway, parallelism);
+
+ ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+ assertEquals(JobStatus.FAILED, eg.getState());
+
+ // suspend
+ eg.suspend(new Exception("suspend"));
+
+ // still in failed state
+ assertEquals(JobStatus.FAILED, eg.getState());
+ validateCancelRpcCalls(gateway, parallelism);
+ }
+
+ /**
+ * Suspending from CANCELING goes to SUSPENDED and sends no additional RPC calls.
+ */
+ @Test
+ public void testSuspendedOutOfCanceling() throws Exception {
+ final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+ final int parallelism = 10;
+ final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+ eg.scheduleForExecution();
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ eg.cancel();
+
+ assertEquals(JobStatus.CANCELLING, eg.getState());
+ validateCancelRpcCalls(gateway, parallelism);
+
+ // suspend
+ eg.suspend(new Exception("suspend"));
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+ ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+
+ ensureCannotLeaveSuspendedState(eg, gateway);
+ }
+
+ /**
+ * Suspending from CANCELLED should do nothing.
+ */
+ @Test
+ public void testSuspendedOutOfCanceled() throws Exception {
+ final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+ final int parallelism = 10;
+ final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+ eg.scheduleForExecution();
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ eg.cancel();
+
+ assertEquals(JobStatus.CANCELLING, eg.getState());
+ validateCancelRpcCalls(gateway, parallelism);
+
+ ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+ assertEquals(JobStatus.CANCELED, eg.getState());
+
+ // suspend
+ eg.suspend(new Exception("suspend"));
+
+ // still in failed state
+ assertEquals(JobStatus.CANCELED, eg.getState());
+ validateCancelRpcCalls(gateway, parallelism);
+ }
+
+ /**
+ * Tests that we can suspend a job when in state RESTARTING.
+ */
+ @Test
+ public void testSuspendWhileRestarting() throws Exception {
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
+ eg.scheduleForExecution();
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ eg.failGlobal(new Exception("test"));
+ assertEquals(JobStatus.FAILING, eg.getState());
+
+ ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+ assertEquals(JobStatus.RESTARTING, eg.getState());
+
+ final Exception exception = new Exception("Suspended");
+
+ eg.suspend(exception);
+
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+ assertEquals(exception, eg.getFailureCause());
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static void ensureCannotLeaveSuspendedState(ExecutionGraph eg, TaskManagerGateway gateway) {
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+ reset(gateway);
+
+ eg.failGlobal(new Exception("fail"));
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+ verifyNoMoreInteractions(gateway);
+
+ eg.cancel();
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+ verifyNoMoreInteractions(gateway);
+
+ eg.suspend(new Exception("suspend again"));
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+ verifyNoMoreInteractions(gateway);
+
+ for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+ assertEquals(0, ev.getCurrentExecutionAttempt().getAttemptNumber());
+ }
+ }
+
+ private static void validateAllVerticesInState(ExecutionGraph eg, ExecutionState expected) {
+ for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+ assertEquals(expected, ev.getCurrentExecutionAttempt().getState());
+ }
+ }
+
+ private static void validateCancelRpcCalls(TaskManagerGateway gateway, int num) {
+ verify(gateway, times(num)).cancelTask(any(ExecutionAttemptID.class), any(Time.class));
+ }
+
+ private static ExecutionGraph createExecutionGraph(TaskManagerGateway gateway, int parallelism) throws Exception {
+ final JobID jobId = new JobID();
+
+ final JobVertex vertex = new JobVertex("vertex");
+ vertex.setInvokableClass(NoOpInvokable.class);
+ vertex.setParallelism(parallelism);
+
+ final SlotProvider slotProvider = new SimpleSlotProvider(jobId, parallelism, gateway);
+
+ return ExecutionGraphTestUtils.createSimpleTestGraph(
+ jobId,
+ slotProvider,
+ new FixedDelayRestartStrategy(0, 0),
+ vertex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 73838dc..0d7e389 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,51 +18,158 @@
package org.apache.flink.runtime.executiongraph;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.util.concurrent.ScheduledExecutorService;
-
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
-import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.SerializedValue;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+/**
+ * A collection of utility methods for testing the ExecutionGraph and its related classes.
+ */
public class ExecutionGraphTestUtils {
- // --------------------------------------------------------------------------------------------
+ private static final Logger TEST_LOGGER = LoggerFactory.getLogger(ExecutionGraphTestUtils.class);
+
+ // ------------------------------------------------------------------------
+ // reaching states
+ // ------------------------------------------------------------------------
+
+ /**
+ * Waits until the job has reached a certain state.
+ *
+ * <p>This method is based on polling and might miss very fast state transitions!
+ */
+ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long maxWaitMillis)
+ throws TimeoutException {
+
+ checkNotNull(eg);
+ checkNotNull(status);
+ checkArgument(maxWaitMillis >= 0);
+
+ // this is a poor implementation - we may want to improve it eventually
+ final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+ while (eg.getState() != status && System.nanoTime() < deadline) {
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException ignored) {}
+ }
+
+ if (System.nanoTime() >= deadline) {
+ throw new TimeoutException();
+ }
+ }
+
+ /**
+ * Waits until the Execution has reached a certain state.
+ *
+ * <p>This method is based on polling and might miss very fast state transitions!
+ */
+ public static void waitUntilExecutionState(Execution execution, ExecutionState state, long maxWaitMillis)
+ throws TimeoutException {
+
+ checkNotNull(execution);
+ checkNotNull(state);
+ checkArgument(maxWaitMillis >= 0);
+
+ // this is a poor implementation - we may want to improve it eventually
+ final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+ while (execution.getState() != state && System.nanoTime() < deadline) {
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException ignored) {}
+ }
+
+ if (System.nanoTime() >= deadline) {
+ throw new TimeoutException();
+ }
+ }
+
+ /**
+ * Takes all vertices in the given ExecutionGraph and switches their current
+ * execution to RUNNING.
+ */
+ public static void switchAllVerticesToRunning(ExecutionGraph eg) {
+ for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+ vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
+ }
+
+ /**
+ * Takes all vertices in the given ExecutionGraph and switches their current
+ * execution to RUNNING.
+ */
+ public static void completeCancellingForAllVertices(ExecutionGraph eg) {
+ for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+ vertex.getCurrentExecutionAttempt().cancelingComplete();
+ }
+ }
+
+ /**
+ * Takes all vertices in the given ExecutionGraph and switches their current
+ * execution to RUNNING.
+ */
+ public static void finishAllVertices(ExecutionGraph eg) {
+ for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+ vertex.getCurrentExecutionAttempt().markFinished();
+ }
+ }
+
+ // ------------------------------------------------------------------------
// state modifications
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
public static void setVertexState(ExecutionVertex vertex, ExecutionState state) {
try {
@@ -89,21 +196,105 @@ public class ExecutionGraphTestUtils {
throw new RuntimeException("Modifying the slot failed", e);
}
}
-
- public static void setGraphStatus(ExecutionGraph graph, JobStatus status) {
- try {
- Field f = ExecutionGraph.class.getDeclaredField("state");
- f.setAccessible(true);
- f.set(graph, status);
- }
- catch (Exception e) {
- throw new RuntimeException("Modifying the status failed", e);
+
+ // ------------------------------------------------------------------------
+ // Mocking Slots
+ // ------------------------------------------------------------------------
+
+ public static SimpleSlot createMockSimpleSlot(JobID jid, TaskManagerGateway gateway) {
+ final TaskManagerLocation location = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572);
+
+ final AllocatedSlot allocatedSlot = new AllocatedSlot(
+ new AllocationID(),
+ jid,
+ location,
+ 0,
+ ResourceProfile.UNKNOWN,
+ gateway);
+
+ return new SimpleSlot(allocatedSlot, mock(SlotOwner.class), 0);
+ }
+
+ // ------------------------------------------------------------------------
+ // Mocking ExecutionGraph
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates an execution graph with on job vertex of parallelism 10 that does no restarts.
+ */
+ public static ExecutionGraph createSimpleTestGraph() throws Exception {
+ return createSimpleTestGraph(new NoRestartStrategy());
+ }
+
+ /**
+ * Creates an execution graph with on job vertex of parallelism 10, using the given
+ * restart strategy.
+ */
+ public static ExecutionGraph createSimpleTestGraph(RestartStrategy restartStrategy) throws Exception {
+ JobVertex vertex = new JobVertex("vertex");
+ vertex.setInvokableClass(NoOpInvokable.class);
+ vertex.setParallelism(10);
+
+ return createSimpleTestGraph(new JobID(), restartStrategy, vertex);
+ }
+
+ /**
+ * Creates an execution graph containing the given vertices.
+ *
+ * <p>The execution graph uses {@link NoRestartStrategy} as the restart strategy.
+ */
+ public static ExecutionGraph createSimpleTestGraph(JobID jid, JobVertex... vertices) throws Exception {
+ return createSimpleTestGraph(jid, new NoRestartStrategy(), vertices);
+ }
+
+ /**
+ * Creates an execution graph containing the given vertices and the given restart strategy.
+ */
+ public static ExecutionGraph createSimpleTestGraph(
+ JobID jid,
+ RestartStrategy restartStrategy,
+ JobVertex... vertices) throws Exception {
+
+ int numSlotsNeeded = 0;
+ for (JobVertex vertex : vertices) {
+ numSlotsNeeded += vertex.getParallelism();
}
+
+ SlotProvider slotProvider = new SimpleSlotProvider(jid, numSlotsNeeded);
+
+ return createSimpleTestGraph(jid, slotProvider, restartStrategy, vertices);
}
-
- // --------------------------------------------------------------------------------------------
+
+ public static ExecutionGraph createSimpleTestGraph(
+ JobID jid,
+ SlotProvider slotProvider,
+ RestartStrategy restartStrategy,
+ JobVertex... vertices) throws Exception {
+
+ checkNotNull(jid);
+ checkNotNull(restartStrategy);
+ checkNotNull(vertices);
+
+ return ExecutionGraphBuilder.buildGraph(
+ null,
+ new JobGraph(jid, "test job", vertices),
+ new Configuration(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ slotProvider,
+ ExecutionGraphTestUtils.class.getClassLoader(),
+ mock(CheckpointRecoveryFactory.class),
+ Time.seconds(10),
+ restartStrategy,
+ new UnregisteredMetricsGroup(),
+ 1,
+ TEST_LOGGER);
+ }
+
+ // ------------------------------------------------------------------------
// utility mocking methods
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
public static Instance getInstance(final TaskManagerGateway gateway) throws Exception {
return getInstance(gateway, 1);
@@ -188,17 +379,7 @@ public class ExecutionGraphTestUtils {
new NoRestartStrategy(),
new Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor)));
- ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1,
- AkkaUtils.getDefaultTimeout()));
-
- Answer<Void> noop = new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- return null;
- }
- };
-
- return ejv;
+ return spy(new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout()));
}
public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
new file mode 100644
index 0000000..0797ef9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+public class ExecutionGraphVariousFailuesTest {
+
+ /**
+ * Test that failing in state restarting will retrigger the restarting logic. This means that
+ * it only goes into the state FAILED after the restart strategy says the job is no longer
+ * restartable.
+ */
+ @Test
+ public void testFailureWhileRestarting() throws Exception {
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(2));
+ eg.scheduleForExecution();
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ eg.failGlobal(new Exception("Test 1"));
+ assertEquals(JobStatus.FAILING, eg.getState());
+ ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+
+ // we should restart since we have two restart attempts left
+ assertEquals(JobStatus.RESTARTING, eg.getState());
+
+ eg.failGlobal(new Exception("Test 2"));
+
+ // we should restart since we have one restart attempts left
+ assertEquals(JobStatus.RESTARTING, eg.getState());
+
+ eg.failGlobal(new Exception("Test 3"));
+
+ // after depleting all our restart attempts we should go into Failed
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+
+ /**
+ * Tests that a {@link SuppressRestartsException} in state RESTARTING stops the restarting
+ * immediately and sets the execution graph's state to FAILED.
+ */
+ @Test
+ public void testSuppressRestartFailureWhileRestarting() throws Exception {
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
+ eg.scheduleForExecution();
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ eg.failGlobal(new Exception("test"));
+ assertEquals(JobStatus.FAILING, eg.getState());
+
+ ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+ assertEquals(JobStatus.RESTARTING, eg.getState());
+
+ // suppress a possible restart
+ eg.failGlobal(new SuppressRestartsException(new Exception("Test")));
+
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+
+ /**
+ * Tests that a failing scheduleOrUpdateConsumers call with a non-existing execution attempt
+ * id, will not fail the execution graph.
+ */
+ @Test
+ public void testFailingScheduleOrUpdateConsumers() throws Exception {
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
+ eg.scheduleForExecution();
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ IntermediateResultPartitionID intermediateResultPartitionId = new IntermediateResultPartitionID();
+ ExecutionAttemptID producerId = new ExecutionAttemptID();
+ ResultPartitionID resultPartitionId = new ResultPartitionID(intermediateResultPartitionId, producerId);
+
+ // The execution attempt id does not exist and thus the scheduleOrUpdateConsumers call
+ // should fail
+
+ try {
+ eg.scheduleOrUpdateConsumers(resultPartitionId);
+ fail("Expected ExecutionGraphException.");
+ } catch (ExecutionGraphException e) {
+ // we've expected this exception to occur
+ }
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 82561b2..fe6c3cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
@@ -403,41 +404,25 @@ public class ExecutionVertexCancelTest {
}
@Test
- public void testSendCancelAndReceiveFail() {
- try {
- final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ public void testSendCancelAndReceiveFail() throws Exception {
+ final ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph();
- final ActorGateway gateway = new CancelSequenceActorGateway(
- TestingUtils.defaultExecutionContext(),
- 1);
+ graph.scheduleForExecution();
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(graph);
+ assertEquals(JobStatus.RUNNING, graph.getState());
- Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ final ExecutionVertex[] vertices = graph.getVerticesTopologically().iterator().next().getTaskVertices();
+ assertEquals(vertices.length, graph.getRegisteredExecutions().size());
- setVertexState(vertex, ExecutionState.RUNNING);
- setVertexResource(vertex, slot);
+ final Execution exec = vertices[3].getCurrentExecutionAttempt();
+ exec.cancel();
+ assertEquals(ExecutionState.CANCELING, exec.getState());
- assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+ exec.markFailed(new Exception("test"));
+ assertTrue(exec.getState() == ExecutionState.FAILED || exec.getState() == ExecutionState.CANCELED);
- vertex.cancel();
-
- assertTrue(vertex.getExecutionState() == ExecutionState.CANCELING || vertex.getExecutionState() == ExecutionState.FAILED);
-
- vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
-
- assertTrue(vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED);
-
- assertTrue(slot.isReleased());
-
- assertEquals(0, vertex.getExecutionGraph().getRegisteredExecutions().size());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ assertTrue(exec.getAssignedResource().isCanceled());
+ assertEquals(vertices.length - 1, exec.getVertex().getExecutionGraph().getRegisteredExecutions().size());
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index eb85a33..0eed90d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -156,7 +156,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
// mimic a restart: all vertices get re-initialized without actually being executed
for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
- ejv.resetForNewExecution();
+ ejv.resetForNewExecution(System.currentTimeMillis(), graph.getGlobalModVersion());
}
// set new location for the sources and some state for the targets
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
deleted file mode 100644
index 5edaf91..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.BaseTestingActorGateway;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import scala.concurrent.ExecutionContext;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ExecutionVertex.class)
-public class ExecutionVertexStopTest extends TestLogger {
-
- private static ActorSystem system;
-
- private static boolean receivedStopSignal;
-
- @AfterClass
- public static void teardown(){
- if(system != null) {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
- }
-
- @Test
- public void testStop() throws Exception {
- final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
- Execution executionMock = mock(Execution.class);
- whenNew(Execution.class).withAnyArguments().thenReturn(executionMock);
-
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
-
- vertex.stop();
-
- verify(executionMock).stop();
- }
-
- @Test
- public void testStopRpc() throws Exception {
- final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
-
- setVertexState(vertex, ExecutionState.SCHEDULED);
- assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
-
- final ActorGateway gateway = new StopSequenceInstanceGateway(
- TestingUtils.defaultExecutionContext());
-
- Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
- vertex.deployToSlot(slot);
-
- receivedStopSignal = false;
- vertex.stop();
- assertTrue(receivedStopSignal);
- }
-
- public static class StopSequenceInstanceGateway extends BaseTestingActorGateway {
- private static final long serialVersionUID = 7611571264006653627L;
-
- public StopSequenceInstanceGateway(ExecutionContext executionContext) {
- super(executionContext);
- }
-
- @Override
- public Object handleMessage(Object message) throws Exception {
- Object result = null;
- if (message instanceof TaskMessages.SubmitTask) {
- result = Acknowledge.get();
- } else if (message instanceof TaskMessages.StopTask) {
- result = Acknowledge.get();
- receivedStopSignal = true;
- }
-
- return result;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
new file mode 100644
index 0000000..8e0c04d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests that the {@link JobVertex#finalizeOnMaster(ClassLoader)} is called properly and
+ * only when the execution graph reaches the a successful final state.
+ */
+public class FinalizeOnMasterTest extends TestLogger {
+
+ @Test
+ public void testFinalizeIsCalledUponSuccess() throws Exception {
+ final JobID jid = new JobID();
+
+ final JobVertex vertex1 = spy(new JobVertex("test vertex 1"));
+ vertex1.setInvokableClass(NoOpInvokable.class);
+ vertex1.setParallelism(3);
+
+ final JobVertex vertex2 = spy(new JobVertex("test vertex 2"));
+ vertex2.setInvokableClass(NoOpInvokable.class);
+ vertex2.setParallelism(2);
+
+ final ExecutionGraph eg = createSimpleTestGraph(jid, vertex1, vertex2);
+ eg.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ // move all vertices to finished state
+ ExecutionGraphTestUtils.finishAllVertices(eg);
+ assertEquals(JobStatus.FINISHED, eg.waitUntilTerminal());
+
+ verify(vertex1, times(1)).finalizeOnMaster(any(ClassLoader.class));
+ verify(vertex2, times(1)).finalizeOnMaster(any(ClassLoader.class));
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+
+ @Test
+ public void testFinalizeIsNotCalledUponFailure() throws Exception {
+ final JobID jid = new JobID();
+
+ final JobVertex vertex = spy(new JobVertex("test vertex 1"));
+ vertex.setInvokableClass(NoOpInvokable.class);
+ vertex.setParallelism(1);
+
+ final ExecutionGraph eg = createSimpleTestGraph(jid, vertex);
+ eg.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+ // fail the execution
+ final Execution exec = eg.getJobVertex(vertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
+ exec.fail(new Exception("test"));
+
+ assertEquals(JobStatus.FAILED, eg.waitUntilTerminal());
+
+ verify(vertex, times(0)).finalizeOnMaster(any(ClassLoader.class));
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+}
[5/5] flink git commit: [FLINK-6312] [build] Update curator version
to 2.12.0
Posted by se...@apache.org.
[FLINK-6312] [build] Update curator version to 2.12.0
The updated curator version includes a bugfix for a potential block
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aadfe45a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aadfe45a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aadfe45a
Branch: refs/heads/master
Commit: aadfe45a880d0f71f23c7a742ff5264e8dc14bf8
Parents: 821ec80
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Mon Apr 17 14:55:27 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 3 19:17:23 2017 +0200
----------------------------------------------------------------------
flink-contrib/flink-storm-examples/pom.xml | 4 ++++
.../checkpoint/ZooKeeperCompletedCheckpointStoreTest.java | 8 ++++----
pom.xml | 2 +-
3 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aadfe45a/flink-contrib/flink-storm-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml
index b9da214..d042dde 100644
--- a/flink-contrib/flink-storm-examples/pom.xml
+++ b/flink-contrib/flink-storm-examples/pom.xml
@@ -84,6 +84,10 @@ under the License.
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </exclusion>
</exclusions>
</dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/aadfe45a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 8fc0f02..0d22dc6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.ErrorListenerPathable;
import org.apache.curator.utils.EnsurePath;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.Executors;
@@ -128,12 +128,12 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
.delete()
.deletingChildrenIfNeeded()
.inBackground(any(BackgroundCallback.class), any(Executor.class))
- ).thenAnswer(new Answer<Pathable<Void>>() {
+ ).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
@Override
- public Pathable<Void> answer(InvocationOnMock invocation) throws Throwable {
+ public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
- Pathable<Void> result = mock(Pathable.class);
+ ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/aadfe45a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f822198..71ad779 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,7 +101,7 @@ under the License.
<chill.version>0.7.4</chill.version>
<asm.version>5.0.4</asm.version>
<zookeeper.version>3.4.6</zookeeper.version>
- <curator.version>2.8.0</curator.version>
+ <curator.version>2.12.0</curator.version>
<jackson.version>2.7.4</jackson.version>
<metrics.version>3.1.0</metrics.version>
<junit.version>4.12</junit.version>