You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:25 UTC

[61/63] [abbrv] git commit: Adjust the web frontend display of the jobs Tolerate concurrent scheduling attempts

Adjust the web frontend display of the jobs
Tolerate concurrent scheduling attempts


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9803657a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9803657a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9803657a

Branch: refs/heads/master
Commit: 9803657a93f3a32618796b8a5204749e459fb5e9
Parents: a00ef7a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 17 19:23:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:21:02 2014 +0200

----------------------------------------------------------------------
 .../web-docs-infoserver/js/analyzer.js          | 27 ++++++++------------
 .../web-docs-infoserver/js/taskmanager.js       |  5 ++--
 .../flink/runtime/executiongraph/Execution.java | 10 +++-----
 .../runtime/executiongraph/ExecutionGraph.java  |  1 +
 .../jobmanager/archive/MemoryArchivist.java     |  4 +--
 .../jobmanager/web/JobmanagerInfoServlet.java   | 15 +++++------
 .../runtime/jobmanager/web/JsonFactory.java     |  4 +--
 .../ExecutionVertexCancelTest.java              |  5 ++--
 8 files changed, 31 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/resources/web-docs-infoserver/js/analyzer.js
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/js/analyzer.js b/flink-runtime/resources/web-docs-infoserver/js/analyzer.js
index e96f420..f93db1c 100644
--- a/flink-runtime/resources/web-docs-infoserver/js/analyzer.js
+++ b/flink-runtime/resources/web-docs-infoserver/js/analyzer.js
@@ -230,18 +230,18 @@ function analyzeGroupvertexTime(json) {
 
 		data.addRows([
 						[
-							new Date(json.verticetimes[vertex.vertexid].READY),
-							new Date(json.verticetimes[vertex.vertexid].STARTING),
-							"ready",
+							new Date(json.verticetimes[vertex.vertexid].SCHEDULED),
+							new Date(json.verticetimes[vertex.vertexid].DEPLOYING),
+							"scheduled",
 							vertex.vertexinstancename+ "_" + cnt,
-							"ready" 
+							"scheduled" 
 						],
 						[
-							new Date(json.verticetimes[vertex.vertexid].STARTING),
+							new Date(json.verticetimes[vertex.vertexid].DEPLOYING),
 							new Date(json.verticetimes[vertex.vertexid].RUNNING),
-							"starting",
+							"deploying",
 							vertex.vertexinstancename+ "_"+ cnt,
-							"starting" 
+							"deploying" 
 						] 
 					]);
 
@@ -249,17 +249,10 @@ function analyzeGroupvertexTime(json) {
 			data.addRows([
 							[
 								new Date(json.verticetimes[vertex.vertexid].RUNNING),
-								new Date(json.verticetimes[vertex.vertexid].FINISHING),
-								" running",
-								vertex.vertexinstancename + "_" + cnt,
-								"running" 
-							],
-							[
-								new Date(json.verticetimes[vertex.vertexid].FINISHING),
 								new Date(json.verticetimes[vertex.vertexid].FINISHED),
-								"finishing",
+								"running",
 								vertex.vertexinstancename + "_" + cnt,
-								"finishing" 
+								"running" 
 							] 
 						]);
 
@@ -310,4 +303,4 @@ function getSelectedRow(timeline) {
 		}
 	}
 	return row;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/resources/web-docs-infoserver/js/taskmanager.js
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/js/taskmanager.js b/flink-runtime/resources/web-docs-infoserver/js/taskmanager.js
index 2bfbed2..7d27d4e 100644
--- a/flink-runtime/resources/web-docs-infoserver/js/taskmanager.js
+++ b/flink-runtime/resources/web-docs-infoserver/js/taskmanager.js
@@ -28,11 +28,11 @@ function loadTaskmanagers(json) {
 	$("#taskmanagerTable").empty();
 	var table = "<table class=\"table table-bordered table-hover table-striped\">";
 	table += "<tr><th>Node</th><th>Ipc Port</th><th>Data Port</th><th>Seconds since last Heartbeat</th>" +
-			"<th>Number of Slots</th><th>Available Slots</th><th>CPU Cores</th><th>Physical Memory (mb)</th><th>TaskManager Heapsize (mb)</th></tr>";
+			"<th>Number of Slots</th><th>Available Slots</th><th>CPU Cores</th><th>Physical Memory (mb)</th><th>TaskManager Heapsize (mb)</th><th>Managed Memory (mb)</th></tr>";
 	for (var i = 0; i < json.taskmanagers.length; i++) {
 		var tm = json.taskmanagers[i]
 		table += "<tr><td>"+tm.inetAdress+"</td><td>"+tm.ipcPort+"</td><td>"+tm.dataPort+"</td><td>"+tm.timeSinceLastHeartbeat+"</td>" +
-				"<td>"+tm.slotsNumber+"</td><td>"+tm.freeSlots+"</td><td>"+tm.cpuCores+"</td><td>"+tm.physicalMemory+"</td><td>"+tm.freeMemory+"</td></tr>";
+				"<td>"+tm.slotsNumber+"</td><td>"+tm.freeSlots+"</td><td>"+tm.cpuCores+"</td><td>"+tm.physicalMemory+"</td><td>"+tm.freeMemory+"</td><<td>"+tm.managedMemory+"</td></tr>";
 	}
 	table += "</table>";
 	$("#taskmanagerTable").append(table);
@@ -47,3 +47,4 @@ function pollTaskmanagers() {
 		pollTaskmanagers();
 	}, 10000);
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b002d8d..37394bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -171,7 +171,7 @@ public class Execution {
 		if (locationConstraint != null && sharingGroup == null) {
 			throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
 		}
-				
+		
 		if (transitionState(CREATED, SCHEDULED)) {
 			
 			ScheduledUnit toSchedule = locationConstraint == null ?
@@ -213,13 +213,9 @@ public class Execution {
 				}
 			}
 		}
-		else if (this.state == CANCELED) {
-			// this can occur very rarely through heavy races. if the task was canceled, we do not
-			// schedule it
-			return;
-		}
 		else {
-			throw new IllegalStateException("The vertex must be in CREATED state to be scheduled.");
+			// call race, already deployed
+			return;
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3ac3386..0c11170 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -139,6 +139,7 @@ public class ExecutionGraph {
 		this.executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
 		
 		this.stateTimestamps = new long[JobStatus.values().length];
+		this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist.java
index 2579b75..76d799f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist.java
@@ -103,8 +103,8 @@ public class MemoryArchivist implements ArchiveListener {
 	public RecentJobEvent getJob(JobID jobId) {
 		synchronized (lock) {
 			return oldJobs.get(jobId);
-		}}
-	
+		}
+	}
 	
 	@Override
 	public List<AbstractEvent> getEvents(JobID jobID) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index f4c67df..9c8ae27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -33,13 +33,13 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.runtime.event.job.AbstractEvent;
 import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
 import org.apache.flink.runtime.event.job.JobEvent;
 import org.apache.flink.runtime.event.job.RecentJobEvent;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -89,7 +89,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 				writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), JobVertexID.fromHexString(groupvertexId));
 			}
 			else if("taskmanagers".equals(req.getParameter("get"))) {
-				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +"}");
+				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getTotalNumberOfRegisteredSlots()+"}");
 			}
 			else if("cancel".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
@@ -464,28 +464,27 @@ public class JobmanagerInfoServlet extends HttpServlet {
 			boolean first = true;
 			for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
 				
-				int num = 0;
 				for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
 					
+					Execution exec = vertex.getCurrentExecutionAttempt();
+					
 					if(first) {
 						first = false;
 					} else {
 						wrt.write(","); }
 					
-					wrt.write("\""+jobVertex.getJobVertex()+"-"+num +"\": {");
-					wrt.write("\"vertexid\": \"" + vertex.getJobvertexId() + "\",");
+					wrt.write("\""+exec.getAttemptId() +"\": {");
+					wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\",");
 					wrt.write("\"vertexname\": \"" + vertex + "\",");
 					wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
 					wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
-					wrt.write("\"STARTING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
+					wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
 					wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
 					wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
 					wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
 					wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
 					wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + "");
 					wrt.write("}");
-					
-					num++;
 				}
 				
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 745a9f8..6ac8613 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -35,7 +35,7 @@ public class JsonFactory {
 	public static String toJson(ExecutionVertex vertex) {
 		StringBuilder json = new StringBuilder("");
 		json.append("{");
-		json.append("\"vertexid\": \"" + vertex.getJobvertexId() + "\",");
+		json.append("\"vertexid\": \"" + vertex.getCurrentExecutionAttempt().getAttemptId() + "\",");
 		json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
 		json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
 		
@@ -66,7 +66,7 @@ public class JsonFactory {
 		
 		ExecutionVertex[] vertices = jobVertex.getTaskVertices();
 		
-		for(int j = 0; j < vertices.length; j++) {
+		for (int j = 0; j < vertices.length; j++) {
 			ExecutionVertex vertex = vertices[j];
 			
 			json.append(toJson(vertex));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index a351209..223f8ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -516,9 +516,10 @@ public class ExecutionVertexCancelTest {
 				
 				Scheduler scheduler = mock(Scheduler.class);
 				vertex.scheduleForExecution(scheduler, false);
-				fail("Method should throw an exception");
 			}
-			catch (IllegalStateException e) {}
+			catch (Exception e) {
+				fail("should not throw an exception");
+			}
 			
 			
 			// deploying while in canceling state is illegal (should immediately go to canceled)