You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:25 UTC
[61/63] [abbrv] git commit: Adjust the web frontend display of the
jobs Tolerate concurrent scheduling attempts
Adjust the web frontend display of the jobs
Tolerate concurrent scheduling attempts
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9803657a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9803657a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9803657a
Branch: refs/heads/master
Commit: 9803657a93f3a32618796b8a5204749e459fb5e9
Parents: a00ef7a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 17 19:23:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:21:02 2014 +0200
----------------------------------------------------------------------
.../web-docs-infoserver/js/analyzer.js | 27 ++++++++------------
.../web-docs-infoserver/js/taskmanager.js | 5 ++--
.../flink/runtime/executiongraph/Execution.java | 10 +++-----
.../runtime/executiongraph/ExecutionGraph.java | 1 +
.../jobmanager/archive/MemoryArchivist.java | 4 +--
.../jobmanager/web/JobmanagerInfoServlet.java | 15 +++++------
.../runtime/jobmanager/web/JsonFactory.java | 4 +--
.../ExecutionVertexCancelTest.java | 5 ++--
8 files changed, 31 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/resources/web-docs-infoserver/js/analyzer.js
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/js/analyzer.js b/flink-runtime/resources/web-docs-infoserver/js/analyzer.js
index e96f420..f93db1c 100644
--- a/flink-runtime/resources/web-docs-infoserver/js/analyzer.js
+++ b/flink-runtime/resources/web-docs-infoserver/js/analyzer.js
@@ -230,18 +230,18 @@ function analyzeGroupvertexTime(json) {
data.addRows([
[
- new Date(json.verticetimes[vertex.vertexid].READY),
- new Date(json.verticetimes[vertex.vertexid].STARTING),
- "ready",
+ new Date(json.verticetimes[vertex.vertexid].SCHEDULED),
+ new Date(json.verticetimes[vertex.vertexid].DEPLOYING),
+ "scheduled",
vertex.vertexinstancename+ "_" + cnt,
- "ready"
+ "scheduled"
],
[
- new Date(json.verticetimes[vertex.vertexid].STARTING),
+ new Date(json.verticetimes[vertex.vertexid].DEPLOYING),
new Date(json.verticetimes[vertex.vertexid].RUNNING),
- "starting",
+ "deploying",
vertex.vertexinstancename+ "_"+ cnt,
- "starting"
+ "deploying"
]
]);
@@ -249,17 +249,10 @@ function analyzeGroupvertexTime(json) {
data.addRows([
[
new Date(json.verticetimes[vertex.vertexid].RUNNING),
- new Date(json.verticetimes[vertex.vertexid].FINISHING),
- " running",
- vertex.vertexinstancename + "_" + cnt,
- "running"
- ],
- [
- new Date(json.verticetimes[vertex.vertexid].FINISHING),
new Date(json.verticetimes[vertex.vertexid].FINISHED),
- "finishing",
+ "running",
vertex.vertexinstancename + "_" + cnt,
- "finishing"
+ "running"
]
]);
@@ -310,4 +303,4 @@ function getSelectedRow(timeline) {
}
}
return row;
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/resources/web-docs-infoserver/js/taskmanager.js
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/js/taskmanager.js b/flink-runtime/resources/web-docs-infoserver/js/taskmanager.js
index 2bfbed2..7d27d4e 100644
--- a/flink-runtime/resources/web-docs-infoserver/js/taskmanager.js
+++ b/flink-runtime/resources/web-docs-infoserver/js/taskmanager.js
@@ -28,11 +28,11 @@ function loadTaskmanagers(json) {
$("#taskmanagerTable").empty();
var table = "<table class=\"table table-bordered table-hover table-striped\">";
table += "<tr><th>Node</th><th>Ipc Port</th><th>Data Port</th><th>Seconds since last Heartbeat</th>" +
- "<th>Number of Slots</th><th>Available Slots</th><th>CPU Cores</th><th>Physical Memory (mb)</th><th>TaskManager Heapsize (mb)</th></tr>";
+ "<th>Number of Slots</th><th>Available Slots</th><th>CPU Cores</th><th>Physical Memory (mb)</th><th>TaskManager Heapsize (mb)</th><th>Managed Memory (mb)</th></tr>";
for (var i = 0; i < json.taskmanagers.length; i++) {
var tm = json.taskmanagers[i]
table += "<tr><td>"+tm.inetAdress+"</td><td>"+tm.ipcPort+"</td><td>"+tm.dataPort+"</td><td>"+tm.timeSinceLastHeartbeat+"</td>" +
- "<td>"+tm.slotsNumber+"</td><td>"+tm.freeSlots+"</td><td>"+tm.cpuCores+"</td><td>"+tm.physicalMemory+"</td><td>"+tm.freeMemory+"</td></tr>";
+ "<td>"+tm.slotsNumber+"</td><td>"+tm.freeSlots+"</td><td>"+tm.cpuCores+"</td><td>"+tm.physicalMemory+"</td><td>"+tm.freeMemory+"</td><<td>"+tm.managedMemory+"</td></tr>";
}
table += "</table>";
$("#taskmanagerTable").append(table);
@@ -47,3 +47,4 @@ function pollTaskmanagers() {
pollTaskmanagers();
}, 10000);
}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b002d8d..37394bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -171,7 +171,7 @@ public class Execution {
if (locationConstraint != null && sharingGroup == null) {
throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
}
-
+
if (transitionState(CREATED, SCHEDULED)) {
ScheduledUnit toSchedule = locationConstraint == null ?
@@ -213,13 +213,9 @@ public class Execution {
}
}
}
- else if (this.state == CANCELED) {
- // this can occur very rarely through heavy races. if the task was canceled, we do not
- // schedule it
- return;
- }
else {
- throw new IllegalStateException("The vertex must be in CREATED state to be scheduled.");
+ // call race, already deployed
+ return;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3ac3386..0c11170 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -139,6 +139,7 @@ public class ExecutionGraph {
this.executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
this.stateTimestamps = new long[JobStatus.values().length];
+ this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist.java
index 2579b75..76d799f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist.java
@@ -103,8 +103,8 @@ public class MemoryArchivist implements ArchiveListener {
public RecentJobEvent getJob(JobID jobId) {
synchronized (lock) {
return oldJobs.get(jobId);
- }}
-
+ }
+ }
@Override
public List<AbstractEvent> getEvents(JobID jobID) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index f4c67df..9c8ae27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -33,13 +33,13 @@ import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
import org.apache.flink.runtime.event.job.JobEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -89,7 +89,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), JobVertexID.fromHexString(groupvertexId));
}
else if("taskmanagers".equals(req.getParameter("get"))) {
- resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +"}");
+ resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getTotalNumberOfRegisteredSlots()+"}");
}
else if("cancel".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job");
@@ -464,28 +464,27 @@ public class JobmanagerInfoServlet extends HttpServlet {
boolean first = true;
for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
- int num = 0;
for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
+ Execution exec = vertex.getCurrentExecutionAttempt();
+
if(first) {
first = false;
} else {
wrt.write(","); }
- wrt.write("\""+jobVertex.getJobVertex()+"-"+num +"\": {");
- wrt.write("\"vertexid\": \"" + vertex.getJobvertexId() + "\",");
+ wrt.write("\""+exec.getAttemptId() +"\": {");
+ wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\",");
wrt.write("\"vertexname\": \"" + vertex + "\",");
wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
- wrt.write("\"STARTING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
+ wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + "");
wrt.write("}");
-
- num++;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 745a9f8..6ac8613 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -35,7 +35,7 @@ public class JsonFactory {
public static String toJson(ExecutionVertex vertex) {
StringBuilder json = new StringBuilder("");
json.append("{");
- json.append("\"vertexid\": \"" + vertex.getJobvertexId() + "\",");
+ json.append("\"vertexid\": \"" + vertex.getCurrentExecutionAttempt().getAttemptId() + "\",");
json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
@@ -66,7 +66,7 @@ public class JsonFactory {
ExecutionVertex[] vertices = jobVertex.getTaskVertices();
- for(int j = 0; j < vertices.length; j++) {
+ for (int j = 0; j < vertices.length; j++) {
ExecutionVertex vertex = vertices[j];
json.append(toJson(vertex));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9803657a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index a351209..223f8ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -516,9 +516,10 @@ public class ExecutionVertexCancelTest {
Scheduler scheduler = mock(Scheduler.class);
vertex.scheduleForExecution(scheduler, false);
- fail("Method should throw an exception");
}
- catch (IllegalStateException e) {}
+ catch (Exception e) {
+ fail("should not throw an exception");
+ }
// deploying while in canceling state is illegal (should immediately go to canceled)