You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/11/19 18:23:36 UTC

[1/5] flink git commit: [FLINK-3019] [client] List restarting jobs with scheduled jobs

Repository: flink
Updated Branches:
  refs/heads/master d8ab8bcfe -> c64fe3e3e


[FLINK-3019] [client] List restarting jobs with scheduled jobs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9abbeace
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9abbeace
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9abbeace

Branch: refs/heads/master
Commit: 9abbeace9fba233a63e214b1bc21a66d44e67434
Parents: 5a86a0a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 17 14:11:44 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:22:03 2015 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/client/CliFrontend.java    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9abbeace/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 070040c..6a79677 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -509,7 +509,8 @@ public class CliFrontend {
 				}
 
 				for (JobStatusMessage rj : jobs) {
-					if (running && rj.getJobState().equals(JobStatus.RUNNING)) {
+					if (running && (rj.getJobState().equals(JobStatus.RUNNING)
+							|| rj.getJobState().equals(JobStatus.RESTARTING))) {
 						runningJobs.add(rj);
 					}
 					if (scheduled && rj.getJobState().equals(JobStatus.CREATED)) {
@@ -532,10 +533,10 @@ public class CliFrontend {
 					else {
 						Collections.sort(runningJobs, njec);
 
-						System.out.println("------------------------ Running Jobs ------------------------");
+						System.out.println("------------------ Running/Restarting Jobs -------------------");
 						for (JobStatusMessage rj : runningJobs) {
 							System.out.println(df.format(new Date(rj.getStartTime()))
-									+ " : " + rj.getJobId() + " : " + rj.getJobName());
+									+ " : " + rj.getJobId() + " : " + rj.getJobName() + " (" + rj.getJobState() + ")");
 						}
 						System.out.println("--------------------------------------------------------------");
 					}


[5/5] flink git commit: [FLINK-3028] [runtime-web] Show cancel button for restarting jobs

Posted by uc...@apache.org.
[FLINK-3028] [runtime-web] Show cancel button for restarting jobs

This closes #1369


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c64fe3e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c64fe3e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c64fe3e3

Branch: refs/heads/master
Commit: c64fe3e3e54c0d267a30456455e5c701ae7f706e
Parents: 9abbeac
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 17 14:51:15 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:23:03 2015 +0100

----------------------------------------------------------------------
 .../web-dashboard/app/partials/jobs/job.jade    |  2 +-
 .../web-dashboard/web/partials/jobs/job.html    |  2 +-
 .../web/partials/jobs/job.statistics.html       | 40 --------------------
 .../taskmanager/taskmanager.logfile.html        | 18 ---------
 .../taskmanager/taskmanager.stdout.html         | 18 ---------
 5 files changed, 2 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c64fe3e3/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
index 7ec97bd..ffc8962 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
@@ -39,7 +39,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
   .navbar-info.last.first(ng-if="job.duration > -1" title="{{job.duration | humanizeDuration:false}}")
     | {{job.duration | humanizeDuration:true}}
 
-  .navbar-info.last.first(ng-if="job.state=='RUNNING' || job.state=='CREATED'")
+  .navbar-info.last.first(ng-if="job.state=='RUNNING' || job.state=='CREATED' || job.state=='RESTARTING'")
     span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)")
       | Cancel
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c64fe3e3/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
index 5e80d1e..dbf6007 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
@@ -32,7 +32,7 @@ limitations under the License.
       - 
       {{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></div>
   <div ng-if="job.duration &gt; -1" title="{{job.duration | humanizeDuration:false}}" class="navbar-info last first">{{job.duration | humanizeDuration:true}}</div>
-  <div ng-if="job.state=='RUNNING' || job.state=='CREATED'" class="navbar-info last first"><span ng-click="cancelJob($event)" class="navbar-info-button btn btn-default">Cancel</span></div>
+  <div ng-if="job.state=='RUNNING' || job.state=='CREATED' || job.state=='RESTARTING'" class="navbar-info last first"><span ng-click="cancelJob($event)" class="navbar-info-button btn btn-default">Cancel</span></div>
 </nav>
 <nav ng-if="job" class="navbar navbar-default navbar-fixed-top navbar-main-additional">
   <ul class="nav nav-tabs">

http://git-wip-us.apache.org/repos/asf/flink/blob/c64fe3e3/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html
deleted file mode 100644
index 951cc1c..0000000
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html
+++ /dev/null
@@ -1,40 +0,0 @@
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
--->
-<table class="table table-properties">
-  <thead>
-    <tr>
-      <th colspan="2">Some statistics</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-      <td>Operator</td>
-      <td>1</td>
-    </tr>
-    <tr>
-      <td>Parallelism</td>
-      <td>2</td>
-    </tr>
-    <tr>
-      <td>Subtasks-per-instance</td>
-      <td>3</td>
-    </tr>
-  </tbody>
-</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c64fe3e3/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.logfile.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.logfile.html b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.logfile.html
deleted file mode 100644
index bd49dbc..0000000
--- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.logfile.html
+++ /dev/null
@@ -1,18 +0,0 @@
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c64fe3e3/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html
deleted file mode 100644
index bd49dbc..0000000
--- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html
+++ /dev/null
@@ -1,18 +0,0 @@
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
\ No newline at end of file


[3/5] flink git commit: [FLINK-3011] [runtime] Disallow ExecutionGraph state transition from FAILED to RESTARTING

Posted by uc...@apache.org.
[FLINK-3011] [runtime] Disallow ExecutionGraph state transition from FAILED to RESTARTING

Removes the possibility to go from FAILED state back to RESTARTING. This was only used in a test
case. It was a breaking the terminal state semantics of the FAILED state.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a402002d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a402002d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a402002d

Branch: refs/heads/master
Commit: a402002df7b6a3f21d84f9338b8b0afec1cc65ec
Parents: d8ab8bc
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Nov 16 16:18:20 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:22:03 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/ExecutionGraph.java   |  6 ------
 .../executiongraph/ExecutionGraphRestartTest.scala     | 13 ++++---------
 2 files changed, 4 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a402002d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9430d80..aae0b7c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -746,12 +746,6 @@ public class ExecutionGraph implements Serializable {
 
 	public void restart() {
 		try {
-			if (state == JobStatus.FAILED) {
-				if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) {
-					throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart.");
-				}
-			}
-
 			synchronized (progressLock) {
 				if (state != JobStatus.RESTARTING) {
 					throw new IllegalStateException("Can only restart job from state restarting.");

http://git-wip-us.apache.org/repos/asf/flink/blob/a402002d/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index e41d7ff..8fb3c4e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -43,7 +43,7 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
   val NUM_TASKS = 31
 
   "The execution graph" must {
-    "be manually restartable" in {
+    "not be manually restartable" in {
       try {
         val instance = ExecutionGraphTestUtils.getInstance(
           new SimpleActorGateway(TestingUtils.directExecutionContext),
@@ -73,21 +73,16 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
         eg.getState should equal(JobStatus.RUNNING)
 
         eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
-        
+
         for (vertex <- eg.getAllExecutionVertices().asScala) {
           vertex.getCurrentExecutionAttempt().cancelingComplete()
         }
-        
+
         eg.getState should equal(JobStatus.FAILED)
 
         eg.restart()
-        eg.getState should equal(JobStatus.RUNNING)
-        
-        for (vertex <- eg.getAllExecutionVertices.asScala) {
-          vertex.getCurrentExecutionAttempt().markFinished()
-        }
 
-        eg.getState should equal(JobStatus.FINISHED)
+        eg.getState should equal(JobStatus.FAILED)
       } catch {
         case t: Throwable =>
           t.printStackTrace()


[2/5] flink git commit: [FLINK-3011] [runtime] Fix cancel during restart

Posted by uc...@apache.org.
[FLINK-3011] [runtime] Fix cancel during restart


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a86a0a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a86a0a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a86a0a1

Branch: refs/heads/master
Commit: 5a86a0a19dad28162ba7ddf27bd9e63468a5966c
Parents: ceabbd0
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 17 11:56:42 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:22:03 2015 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  29 +++-
 .../ExecutionGraphRestartTest.java              | 141 +++++++++++++++++++
 2 files changed, 169 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5a86a0a1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index aae0b7c..1e5d02c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -711,6 +711,26 @@ public class ExecutionGraph implements Serializable {
 					return;
 				}
 			}
+			// Executions are being canceled. Go into cancelling and wait for
+			// all vertices to be in their final state.
+			else if (current == JobStatus.FAILING) {
+				if (transitionState(current, JobStatus.CANCELLING)) {
+					return;
+				}
+			}
+			// All vertices have been cancelled and it's safe to directly go
+			// into the canceled state.
+			else if (current == JobStatus.RESTARTING) {
+				synchronized (progressLock) {
+					if (transitionState(current, JobStatus.CANCELED)) {
+						postRunCleanup();
+						progressLock.notifyAll();
+
+						LOG.info("Canceled during restart.");
+						return;
+					}
+				}
+			}
 			else {
 				// no need to treat other states
 				return;
@@ -747,9 +767,16 @@ public class ExecutionGraph implements Serializable {
 	public void restart() {
 		try {
 			synchronized (progressLock) {
-				if (state != JobStatus.RESTARTING) {
+				JobStatus current = state;
+
+				if (current == JobStatus.CANCELED) {
+					LOG.info("Canceled job during restart. Aborting restart.");
+					return;
+				}
+				else if (current != JobStatus.RESTARTING) {
 					throw new IllegalStateException("Can only restart job from state restarting.");
 				}
+
 				if (scheduler == null) {
 					throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/5a86a0a1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 57b1829..a50aa2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -37,6 +38,9 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
 
 public class ExecutionGraphRestartTest {
 
@@ -158,4 +162,141 @@ public class ExecutionGraphRestartTest {
 			fail("Failed to wait until all execution attempts left the state DEPLOYING.");
 		}
 	}
+
+	@Test
+	public void testCancelWhileRestarting() throws Exception {
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				NUM_TASKS);
+
+		scheduler.newInstanceAvailable(instance);
+
+		// Blocking program
+		ExecutionGraph executionGraph = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"TestJob",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout());
+
+		JobVertex jobVertex = new JobVertex("NoOpInvokable");
+		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+		jobVertex.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+
+		// We want to manually control the restart and delay
+		executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
+		executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
+		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, executionGraph.getState());
+
+		executionGraph.scheduleForExecution(scheduler);
+
+		assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+		// Kill the instance and wait for the job to restart
+		instance.markDead();
+
+		Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+
+		while (deadline.hasTimeLeft() &&
+				executionGraph.getState() != JobStatus.RESTARTING) {
+
+			Thread.sleep(100);
+		}
+
+		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+
+		// Canceling needs to abort the restart
+		executionGraph.cancel();
+
+		assertEquals(JobStatus.CANCELED, executionGraph.getState());
+
+		// The restart has been aborted
+		executionGraph.restart();
+
+		assertEquals(JobStatus.CANCELED, executionGraph.getState());
+	}
+
+	@Test
+	public void testCancelWhileFailing() throws Exception {
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				NUM_TASKS);
+
+		scheduler.newInstanceAvailable(instance);
+
+		// Blocking program
+		ExecutionGraph executionGraph = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"TestJob",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout());
+
+		// Spy on the graph
+		executionGraph = spy(executionGraph);
+
+		// Do nothing here, because we don't want to transition out of
+		// the FAILING state.
+		doNothing().when(executionGraph).jobVertexInFinalState();
+
+		JobVertex jobVertex = new JobVertex("NoOpInvokable");
+		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+		jobVertex.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+
+		// We want to manually control the restart and delay
+		executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
+		executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
+		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, executionGraph.getState());
+
+		executionGraph.scheduleForExecution(scheduler);
+
+		assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+		// Kill the instance...
+		instance.markDead();
+
+		Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+
+		// ...and wait for all vertices to be in state FAILED. The
+		// jobVertexInFinalState does nothing, that's why we don't wait on the
+		// job status.
+		boolean success = false;
+		while (deadline.hasTimeLeft() && !success) {
+			success = true;
+			for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
+				if (vertex.getExecutionState() != ExecutionState.FAILED) {
+					success = false;
+					Thread.sleep(100);
+					break;
+				}
+			}
+		}
+
+		// Still in failing
+		assertEquals(JobStatus.FAILING, executionGraph.getState());
+
+		// The cancel call needs to change the state to CANCELLING
+		executionGraph.cancel();
+
+		assertEquals(JobStatus.CANCELLING, executionGraph.getState());
+
+		// Unspy and finalize the job state
+		doCallRealMethod().when(executionGraph).jobVertexInFinalState();
+
+		executionGraph.jobVertexInFinalState();
+
+		assertEquals(JobStatus.CANCELED, executionGraph.getState());
+	}
 }


[4/5] flink git commit: [FLINK-3011] [runtime, tests] Translate ExecutionGraphRestartTest to Java

Posted by uc...@apache.org.
[FLINK-3011] [runtime, tests] Translate ExecutionGraphRestartTest to Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ceabbd07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ceabbd07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ceabbd07

Branch: refs/heads/master
Commit: ceabbd07d54cd26747c6fd0d9ddc7fab5176b449
Parents: a402002
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 17 11:40:54 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:22:03 2015 +0100

----------------------------------------------------------------------
 .../ExecutionGraphRestartTest.java              | 161 ++++++++++++++++++
 .../ExecutionGraphRestartTest.scala             | 162 -------------------
 2 files changed, 161 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ceabbd07/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
new file mode 100644
index 0000000..57b1829
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ExecutionGraphRestartTest {
+
+	private final static int NUM_TASKS = 31;
+
+	@Test
+	public void testNotRestartManually() throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				NUM_TASKS);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		JobVertex sender = new JobVertex("Task");
+		sender.setInvokableClass(Tasks.NoOpInvokable.class);
+		sender.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"test job",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout());
+		eg.setNumberOfRetriesLeft(0);
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+
+		eg.scheduleForExecution(scheduler);
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+		}
+
+		assertEquals(JobStatus.FAILED, eg.getState());
+
+		eg.restart();
+
+		assertEquals(JobStatus.FAILED, eg.getState());
+	}
+
+	@Test
+	public void testRestartAutomatically() throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				NUM_TASKS);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		JobVertex sender = new JobVertex("Task");
+		sender.setInvokableClass(Tasks.NoOpInvokable.class);
+		sender.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"Test job",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout());
+		eg.setNumberOfRetriesLeft(1);
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+
+		eg.scheduleForExecution(scheduler);
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+		assertEquals(JobStatus.FAILING, eg.getState());
+
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+		}
+
+		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
+
+		// Wait for async restart
+		Deadline deadline = timeout.fromNow();
+		while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
+			Thread.sleep(100);
+		}
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		// Wait for deploying after async restart
+		deadline = timeout.fromNow();
+		boolean success = false;
+
+		while (deadline.hasTimeLeft() && !success) {
+			success = true;
+
+			for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+				if (vertex.getCurrentExecutionAttempt().getAssignedResource() == null) {
+					success = false;
+					Thread.sleep(100);
+					break;
+				}
+			}
+		}
+
+		if (deadline.hasTimeLeft()) {
+			for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+				vertex.getCurrentExecutionAttempt().markFinished();
+			}
+
+			assertEquals(JobStatus.FINISHED, eg.getState());
+		}
+		else {
+			fail("Failed to wait until all execution attempts left the state DEPLOYING.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceabbd07/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
deleted file mode 100644
index 8fb3c4e..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.testingUtils.TestingUtils
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{Matchers, WordSpecLike}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.FiniteDuration
-
-@RunWith(classOf[JUnitRunner])
-class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
-
-  val NUM_TASKS = 31
-
-  "The execution graph" must {
-    "not be manually restartable" in {
-      try {
-        val instance = ExecutionGraphTestUtils.getInstance(
-          new SimpleActorGateway(TestingUtils.directExecutionContext),
-            NUM_TASKS)
-
-        val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
-        scheduler.newInstanceAvailable(instance)
-
-        val sender = new JobVertex("Task")
-        sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
-        sender.setParallelism(NUM_TASKS)
-
-        val jobGraph = new JobGraph("Pointwise job", sender)
-
-        val eg = new ExecutionGraph(
-          TestingUtils.defaultExecutionContext,
-          new JobID(),
-          "test job",
-          new Configuration(),
-          AkkaUtils.getDefaultTimeout)
-        eg.setNumberOfRetriesLeft(0)
-        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
-
-        eg.getState should equal(JobStatus.CREATED)
-
-        eg.scheduleForExecution(scheduler)
-        eg.getState should equal(JobStatus.RUNNING)
-
-        eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
-
-        for (vertex <- eg.getAllExecutionVertices().asScala) {
-          vertex.getCurrentExecutionAttempt().cancelingComplete()
-        }
-
-        eg.getState should equal(JobStatus.FAILED)
-
-        eg.restart()
-
-        eg.getState should equal(JobStatus.FAILED)
-      } catch {
-        case t: Throwable =>
-          t.printStackTrace()
-          fail(t.getMessage)
-      }
-    }
-
-    "restart itself automatically" in {
-      try {
-        val instance = ExecutionGraphTestUtils.getInstance(
-          new SimpleActorGateway(TestingUtils.directExecutionContext),
-          NUM_TASKS)
-
-        val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
-        scheduler.newInstanceAvailable(instance)
-
-        val sender = new JobVertex("Task")
-        sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
-        sender.setParallelism(NUM_TASKS)
-
-        val jobGraph = new JobGraph("Pointwise job", sender)
-
-        val eg = new ExecutionGraph(
-          TestingUtils.defaultExecutionContext,
-          new JobID(),
-          "Test job",
-          new Configuration(),
-          AkkaUtils.getDefaultTimeout)
-        eg.setNumberOfRetriesLeft(1)
-        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
-
-        eg.getState should equal(JobStatus.CREATED)
-
-        eg.scheduleForExecution(scheduler)
-        eg.getState should equal(JobStatus.RUNNING)
-
-        eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
-        eg.getState should equal(JobStatus.FAILING)
-        
-        for (vertex <- eg.getAllExecutionVertices.asScala) {
-          vertex.getCurrentExecutionAttempt().cancelingComplete()
-        }
-
-        val timeout = new FiniteDuration(2, TimeUnit.MINUTES)
-
-        // Wait for async restart
-        var deadline = timeout.fromNow
-        while (deadline.hasTimeLeft() && eg.getState != JobStatus.RUNNING) {
-          Thread.sleep(100)
-        }
-
-        eg.getState should equal(JobStatus.RUNNING)
-
-        // Wait for deploying after async restart
-        deadline = timeout.fromNow
-        while (deadline.hasTimeLeft() && eg.getAllExecutionVertices.asScala.exists(
-          _.getCurrentExecutionAttempt.getAssignedResource == null)) {
-          Thread.sleep(100)
-        }
-
-        if (deadline.hasTimeLeft()) {
-          for (vertex <- eg.getAllExecutionVertices.asScala) {
-            vertex.getCurrentExecutionAttempt().markFinished()
-          }
-
-          eg.getState() should equal(JobStatus.FINISHED)
-        } else {
-          fail("Failed to wait until all execution attempts left the state DEPLOYING.")
-        }
-      } catch {
-        case t: Throwable =>
-          t.printStackTrace()
-          fail(t.getMessage)
-      }
-    }
-  }
-}