You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/11/19 18:23:36 UTC
[1/5] flink git commit: [FLINK-3019] [client] List restarting jobs
with scheduled jobs
Repository: flink
Updated Branches:
refs/heads/master d8ab8bcfe -> c64fe3e3e
[FLINK-3019] [client] List restarting jobs with scheduled jobs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9abbeace
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9abbeace
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9abbeace
Branch: refs/heads/master
Commit: 9abbeace9fba233a63e214b1bc21a66d44e67434
Parents: 5a86a0a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 17 14:11:44 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:22:03 2015 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/flink/client/CliFrontend.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9abbeace/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 070040c..6a79677 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -509,7 +509,8 @@ public class CliFrontend {
}
for (JobStatusMessage rj : jobs) {
- if (running && rj.getJobState().equals(JobStatus.RUNNING)) {
+ if (running && (rj.getJobState().equals(JobStatus.RUNNING)
+ || rj.getJobState().equals(JobStatus.RESTARTING))) {
runningJobs.add(rj);
}
if (scheduled && rj.getJobState().equals(JobStatus.CREATED)) {
@@ -532,10 +533,10 @@ public class CliFrontend {
else {
Collections.sort(runningJobs, njec);
- System.out.println("------------------------ Running Jobs ------------------------");
+ System.out.println("------------------ Running/Restarting Jobs -------------------");
for (JobStatusMessage rj : runningJobs) {
System.out.println(df.format(new Date(rj.getStartTime()))
- + " : " + rj.getJobId() + " : " + rj.getJobName());
+ + " : " + rj.getJobId() + " : " + rj.getJobName() + " (" + rj.getJobState() + ")");
}
System.out.println("--------------------------------------------------------------");
}
[5/5] flink git commit: [FLINK-3028] [runtime-web] Show cancel button
for restarting jobs
Posted by uc...@apache.org.
[FLINK-3028] [runtime-web] Show cancel button for restarting jobs
This closes #1369
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c64fe3e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c64fe3e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c64fe3e3
Branch: refs/heads/master
Commit: c64fe3e3e54c0d267a30456455e5c701ae7f706e
Parents: 9abbeac
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 17 14:51:15 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:23:03 2015 +0100
----------------------------------------------------------------------
.../web-dashboard/app/partials/jobs/job.jade | 2 +-
.../web-dashboard/web/partials/jobs/job.html | 2 +-
.../web/partials/jobs/job.statistics.html | 40 --------------------
.../taskmanager/taskmanager.logfile.html | 18 ---------
.../taskmanager/taskmanager.stdout.html | 18 ---------
5 files changed, 2 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c64fe3e3/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
index 7ec97bd..ffc8962 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
@@ -39,7 +39,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
.navbar-info.last.first(ng-if="job.duration > -1" title="{{job.duration | humanizeDuration:false}}")
| {{job.duration | humanizeDuration:true}}
- .navbar-info.last.first(ng-if="job.state=='RUNNING' || job.state=='CREATED'")
+ .navbar-info.last.first(ng-if="job.state=='RUNNING' || job.state=='CREATED' || job.state=='RESTARTING'")
span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)")
| Cancel
http://git-wip-us.apache.org/repos/asf/flink/blob/c64fe3e3/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
index 5e80d1e..dbf6007 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
@@ -32,7 +32,7 @@ limitations under the License.
-
{{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></div>
<div ng-if="job.duration > -1" title="{{job.duration | humanizeDuration:false}}" class="navbar-info last first">{{job.duration | humanizeDuration:true}}</div>
- <div ng-if="job.state=='RUNNING' || job.state=='CREATED'" class="navbar-info last first"><span ng-click="cancelJob($event)" class="navbar-info-button btn btn-default">Cancel</span></div>
+ <div ng-if="job.state=='RUNNING' || job.state=='CREATED' || job.state=='RESTARTING'" class="navbar-info last first"><span ng-click="cancelJob($event)" class="navbar-info-button btn btn-default">Cancel</span></div>
</nav>
<nav ng-if="job" class="navbar navbar-default navbar-fixed-top navbar-main-additional">
<ul class="nav nav-tabs">
http://git-wip-us.apache.org/repos/asf/flink/blob/c64fe3e3/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html
deleted file mode 100644
index 951cc1c..0000000
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html
+++ /dev/null
@@ -1,40 +0,0 @@
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
--->
-<table class="table table-properties">
- <thead>
- <tr>
- <th colspan="2">Some statistics</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>Operator</td>
- <td>1</td>
- </tr>
- <tr>
- <td>Parallelism</td>
- <td>2</td>
- </tr>
- <tr>
- <td>Subtasks-per-instance</td>
- <td>3</td>
- </tr>
- </tbody>
-</table>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/c64fe3e3/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.logfile.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.logfile.html b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.logfile.html
deleted file mode 100644
index bd49dbc..0000000
--- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.logfile.html
+++ /dev/null
@@ -1,18 +0,0 @@
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/c64fe3e3/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html
deleted file mode 100644
index bd49dbc..0000000
--- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html
+++ /dev/null
@@ -1,18 +0,0 @@
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
\ No newline at end of file
[3/5] flink git commit: [FLINK-3011] [runtime] Disallow
ExecutionGraph state transition from FAILED to RESTARTING
Posted by uc...@apache.org.
[FLINK-3011] [runtime] Disallow ExecutionGraph state transition from FAILED to RESTARTING
Removes the possibility to go from FAILED state back to RESTARTING. This was only used in a test
case. It was a breaking the terminal state semantics of the FAILED state.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a402002d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a402002d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a402002d
Branch: refs/heads/master
Commit: a402002df7b6a3f21d84f9338b8b0afec1cc65ec
Parents: d8ab8bc
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Nov 16 16:18:20 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:22:03 2015 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/ExecutionGraph.java | 6 ------
.../executiongraph/ExecutionGraphRestartTest.scala | 13 ++++---------
2 files changed, 4 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a402002d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9430d80..aae0b7c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -746,12 +746,6 @@ public class ExecutionGraph implements Serializable {
public void restart() {
try {
- if (state == JobStatus.FAILED) {
- if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) {
- throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart.");
- }
- }
-
synchronized (progressLock) {
if (state != JobStatus.RESTARTING) {
throw new IllegalStateException("Can only restart job from state restarting.");
http://git-wip-us.apache.org/repos/asf/flink/blob/a402002d/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index e41d7ff..8fb3c4e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -43,7 +43,7 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
val NUM_TASKS = 31
"The execution graph" must {
- "be manually restartable" in {
+ "not be manually restartable" in {
try {
val instance = ExecutionGraphTestUtils.getInstance(
new SimpleActorGateway(TestingUtils.directExecutionContext),
@@ -73,21 +73,16 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
eg.getState should equal(JobStatus.RUNNING)
eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
-
+
for (vertex <- eg.getAllExecutionVertices().asScala) {
vertex.getCurrentExecutionAttempt().cancelingComplete()
}
-
+
eg.getState should equal(JobStatus.FAILED)
eg.restart()
- eg.getState should equal(JobStatus.RUNNING)
-
- for (vertex <- eg.getAllExecutionVertices.asScala) {
- vertex.getCurrentExecutionAttempt().markFinished()
- }
- eg.getState should equal(JobStatus.FINISHED)
+ eg.getState should equal(JobStatus.FAILED)
} catch {
case t: Throwable =>
t.printStackTrace()
[2/5] flink git commit: [FLINK-3011] [runtime] Fix cancel during
restart
Posted by uc...@apache.org.
[FLINK-3011] [runtime] Fix cancel during restart
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a86a0a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a86a0a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a86a0a1
Branch: refs/heads/master
Commit: 5a86a0a19dad28162ba7ddf27bd9e63468a5966c
Parents: ceabbd0
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 17 11:56:42 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:22:03 2015 +0100
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 29 +++-
.../ExecutionGraphRestartTest.java | 141 +++++++++++++++++++
2 files changed, 169 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5a86a0a1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index aae0b7c..1e5d02c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -711,6 +711,26 @@ public class ExecutionGraph implements Serializable {
return;
}
}
+ // Executions are being canceled. Go into cancelling and wait for
+ // all vertices to be in their final state.
+ else if (current == JobStatus.FAILING) {
+ if (transitionState(current, JobStatus.CANCELLING)) {
+ return;
+ }
+ }
+ // All vertices have been cancelled and it's safe to directly go
+ // into the canceled state.
+ else if (current == JobStatus.RESTARTING) {
+ synchronized (progressLock) {
+ if (transitionState(current, JobStatus.CANCELED)) {
+ postRunCleanup();
+ progressLock.notifyAll();
+
+ LOG.info("Canceled during restart.");
+ return;
+ }
+ }
+ }
else {
// no need to treat other states
return;
@@ -747,9 +767,16 @@ public class ExecutionGraph implements Serializable {
public void restart() {
try {
synchronized (progressLock) {
- if (state != JobStatus.RESTARTING) {
+ JobStatus current = state;
+
+ if (current == JobStatus.CANCELED) {
+ LOG.info("Canceled job during restart. Aborting restart.");
+ return;
+ }
+ else if (current != JobStatus.RESTARTING) {
throw new IllegalStateException("Can only restart job from state restarting.");
}
+
if (scheduler == null) {
throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5a86a0a1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 57b1829..a50aa2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -37,6 +38,9 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
public class ExecutionGraphRestartTest {
@@ -158,4 +162,141 @@ public class ExecutionGraphRestartTest {
fail("Failed to wait until all execution attempts left the state DEPLOYING.");
}
}
+
+ @Test
+ public void testCancelWhileRestarting() throws Exception {
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+ Instance instance = ExecutionGraphTestUtils.getInstance(
+ new SimpleActorGateway(TestingUtils.directExecutionContext()),
+ NUM_TASKS);
+
+ scheduler.newInstanceAvailable(instance);
+
+ // Blocking program
+ ExecutionGraph executionGraph = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ new JobID(),
+ "TestJob",
+ new Configuration(),
+ AkkaUtils.getDefaultTimeout());
+
+ JobVertex jobVertex = new JobVertex("NoOpInvokable");
+ jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setParallelism(NUM_TASKS);
+
+ JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+
+ // We want to manually control the restart and delay
+ executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
+ executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
+ executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ assertEquals(JobStatus.CREATED, executionGraph.getState());
+
+ executionGraph.scheduleForExecution(scheduler);
+
+ assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+ // Kill the instance and wait for the job to restart
+ instance.markDead();
+
+ Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+
+ while (deadline.hasTimeLeft() &&
+ executionGraph.getState() != JobStatus.RESTARTING) {
+
+ Thread.sleep(100);
+ }
+
+ assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+
+ // Canceling needs to abort the restart
+ executionGraph.cancel();
+
+ assertEquals(JobStatus.CANCELED, executionGraph.getState());
+
+ // The restart has been aborted
+ executionGraph.restart();
+
+ assertEquals(JobStatus.CANCELED, executionGraph.getState());
+ }
+
+ @Test
+ public void testCancelWhileFailing() throws Exception {
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+ Instance instance = ExecutionGraphTestUtils.getInstance(
+ new SimpleActorGateway(TestingUtils.directExecutionContext()),
+ NUM_TASKS);
+
+ scheduler.newInstanceAvailable(instance);
+
+ // Blocking program
+ ExecutionGraph executionGraph = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ new JobID(),
+ "TestJob",
+ new Configuration(),
+ AkkaUtils.getDefaultTimeout());
+
+ // Spy on the graph
+ executionGraph = spy(executionGraph);
+
+ // Do nothing here, because we don't want to transition out of
+ // the FAILING state.
+ doNothing().when(executionGraph).jobVertexInFinalState();
+
+ JobVertex jobVertex = new JobVertex("NoOpInvokable");
+ jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setParallelism(NUM_TASKS);
+
+ JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+
+ // We want to manually control the restart and delay
+ executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
+ executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
+ executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ assertEquals(JobStatus.CREATED, executionGraph.getState());
+
+ executionGraph.scheduleForExecution(scheduler);
+
+ assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+ // Kill the instance...
+ instance.markDead();
+
+ Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+
+ // ...and wait for all vertices to be in state FAILED. The
+ // jobVertexInFinalState does nothing, that's why we don't wait on the
+ // job status.
+ boolean success = false;
+ while (deadline.hasTimeLeft() && !success) {
+ success = true;
+ for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
+ if (vertex.getExecutionState() != ExecutionState.FAILED) {
+ success = false;
+ Thread.sleep(100);
+ break;
+ }
+ }
+ }
+
+ // Still in failing
+ assertEquals(JobStatus.FAILING, executionGraph.getState());
+
+ // The cancel call needs to change the state to CANCELLING
+ executionGraph.cancel();
+
+ assertEquals(JobStatus.CANCELLING, executionGraph.getState());
+
+ // Unspy and finalize the job state
+ doCallRealMethod().when(executionGraph).jobVertexInFinalState();
+
+ executionGraph.jobVertexInFinalState();
+
+ assertEquals(JobStatus.CANCELED, executionGraph.getState());
+ }
}
[4/5] flink git commit: [FLINK-3011] [runtime,
tests] Translate ExecutionGraphRestartTest to Java
Posted by uc...@apache.org.
[FLINK-3011] [runtime, tests] Translate ExecutionGraphRestartTest to Java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ceabbd07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ceabbd07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ceabbd07
Branch: refs/heads/master
Commit: ceabbd07d54cd26747c6fd0d9ddc7fab5176b449
Parents: a402002
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 17 11:40:54 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:22:03 2015 +0100
----------------------------------------------------------------------
.../ExecutionGraphRestartTest.java | 161 ++++++++++++++++++
.../ExecutionGraphRestartTest.scala | 162 -------------------
2 files changed, 161 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ceabbd07/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
new file mode 100644
index 0000000..57b1829
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ExecutionGraphRestartTest {
+
+ private final static int NUM_TASKS = 31;
+
+ @Test
+ public void testNotRestartManually() throws Exception {
+ Instance instance = ExecutionGraphTestUtils.getInstance(
+ new SimpleActorGateway(TestingUtils.directExecutionContext()),
+ NUM_TASKS);
+
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+ scheduler.newInstanceAvailable(instance);
+
+ JobVertex sender = new JobVertex("Task");
+ sender.setInvokableClass(Tasks.NoOpInvokable.class);
+ sender.setParallelism(NUM_TASKS);
+
+ JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ new JobID(),
+ "test job",
+ new Configuration(),
+ AkkaUtils.getDefaultTimeout());
+ eg.setNumberOfRetriesLeft(0);
+ eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ assertEquals(JobStatus.CREATED, eg.getState());
+
+ eg.scheduleForExecution(scheduler);
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+
+ for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+ vertex.getCurrentExecutionAttempt().cancelingComplete();
+ }
+
+ assertEquals(JobStatus.FAILED, eg.getState());
+
+ eg.restart();
+
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+
+ @Test
+ public void testRestartAutomatically() throws Exception {
+ Instance instance = ExecutionGraphTestUtils.getInstance(
+ new SimpleActorGateway(TestingUtils.directExecutionContext()),
+ NUM_TASKS);
+
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+ scheduler.newInstanceAvailable(instance);
+
+ JobVertex sender = new JobVertex("Task");
+ sender.setInvokableClass(Tasks.NoOpInvokable.class);
+ sender.setParallelism(NUM_TASKS);
+
+ JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ new JobID(),
+ "Test job",
+ new Configuration(),
+ AkkaUtils.getDefaultTimeout());
+ eg.setNumberOfRetriesLeft(1);
+ eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ assertEquals(JobStatus.CREATED, eg.getState());
+
+ eg.scheduleForExecution(scheduler);
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+ assertEquals(JobStatus.FAILING, eg.getState());
+
+ for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+ vertex.getCurrentExecutionAttempt().cancelingComplete();
+ }
+
+ FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
+
+ // Wait for async restart
+ Deadline deadline = timeout.fromNow();
+ while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ // Wait for deploying after async restart
+ deadline = timeout.fromNow();
+ boolean success = false;
+
+ while (deadline.hasTimeLeft() && !success) {
+ success = true;
+
+ for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+ if (vertex.getCurrentExecutionAttempt().getAssignedResource() == null) {
+ success = false;
+ Thread.sleep(100);
+ break;
+ }
+ }
+ }
+
+ if (deadline.hasTimeLeft()) {
+ for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+ vertex.getCurrentExecutionAttempt().markFinished();
+ }
+
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ else {
+ fail("Failed to wait until all execution attempts left the state DEPLOYING.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ceabbd07/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
deleted file mode 100644
index 8fb3c4e..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.testingUtils.TestingUtils
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{Matchers, WordSpecLike}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.FiniteDuration
-
-@RunWith(classOf[JUnitRunner])
-class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
-
- val NUM_TASKS = 31
-
- "The execution graph" must {
- "not be manually restartable" in {
- try {
- val instance = ExecutionGraphTestUtils.getInstance(
- new SimpleActorGateway(TestingUtils.directExecutionContext),
- NUM_TASKS)
-
- val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
- scheduler.newInstanceAvailable(instance)
-
- val sender = new JobVertex("Task")
- sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
- sender.setParallelism(NUM_TASKS)
-
- val jobGraph = new JobGraph("Pointwise job", sender)
-
- val eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext,
- new JobID(),
- "test job",
- new Configuration(),
- AkkaUtils.getDefaultTimeout)
- eg.setNumberOfRetriesLeft(0)
- eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
-
- eg.getState should equal(JobStatus.CREATED)
-
- eg.scheduleForExecution(scheduler)
- eg.getState should equal(JobStatus.RUNNING)
-
- eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
-
- for (vertex <- eg.getAllExecutionVertices().asScala) {
- vertex.getCurrentExecutionAttempt().cancelingComplete()
- }
-
- eg.getState should equal(JobStatus.FAILED)
-
- eg.restart()
-
- eg.getState should equal(JobStatus.FAILED)
- } catch {
- case t: Throwable =>
- t.printStackTrace()
- fail(t.getMessage)
- }
- }
-
- "restart itself automatically" in {
- try {
- val instance = ExecutionGraphTestUtils.getInstance(
- new SimpleActorGateway(TestingUtils.directExecutionContext),
- NUM_TASKS)
-
- val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
- scheduler.newInstanceAvailable(instance)
-
- val sender = new JobVertex("Task")
- sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
- sender.setParallelism(NUM_TASKS)
-
- val jobGraph = new JobGraph("Pointwise job", sender)
-
- val eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext,
- new JobID(),
- "Test job",
- new Configuration(),
- AkkaUtils.getDefaultTimeout)
- eg.setNumberOfRetriesLeft(1)
- eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
-
- eg.getState should equal(JobStatus.CREATED)
-
- eg.scheduleForExecution(scheduler)
- eg.getState should equal(JobStatus.RUNNING)
-
- eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
- eg.getState should equal(JobStatus.FAILING)
-
- for (vertex <- eg.getAllExecutionVertices.asScala) {
- vertex.getCurrentExecutionAttempt().cancelingComplete()
- }
-
- val timeout = new FiniteDuration(2, TimeUnit.MINUTES)
-
- // Wait for async restart
- var deadline = timeout.fromNow
- while (deadline.hasTimeLeft() && eg.getState != JobStatus.RUNNING) {
- Thread.sleep(100)
- }
-
- eg.getState should equal(JobStatus.RUNNING)
-
- // Wait for deploying after async restart
- deadline = timeout.fromNow
- while (deadline.hasTimeLeft() && eg.getAllExecutionVertices.asScala.exists(
- _.getCurrentExecutionAttempt.getAssignedResource == null)) {
- Thread.sleep(100)
- }
-
- if (deadline.hasTimeLeft()) {
- for (vertex <- eg.getAllExecutionVertices.asScala) {
- vertex.getCurrentExecutionAttempt().markFinished()
- }
-
- eg.getState() should equal(JobStatus.FINISHED)
- } else {
- fail("Failed to wait until all execution attempts left the state DEPLOYING.")
- }
- } catch {
- case t: Throwable =>
- t.printStackTrace()
- fail(t.getMessage)
- }
- }
- }
-}