You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:20:20 UTC

[48/51] [abbrv] flink git commit: [FLINK-2687] [monitoring api] Add handlers for subtask details and accumulators

[FLINK-2687] [monitoring api] Add handlers for subtask details and accumulators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3f79172
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3f79172
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3f79172

Branch: refs/heads/master
Commit: b3f791727c0713251a4b02a766eb177eb9327113
Parents: 616b7d5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 10 15:47:28 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 17 14:21:52 2015 +0200

----------------------------------------------------------------------
 flink-runtime-web/pom.xml                       |  18 -
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  26 +-
 .../AbstractExecutionGraphRequestHandler.java   |   4 +-
 .../AbstractJobVertexRequestHandler.java        |  62 ++
 .../AbstractSubtaskAttemptRequestHandler.java   |  68 ++
 .../handlers/AbstractSubtaskRequestHandler.java |  62 ++
 .../handlers/JobVertexAccumulatorsHandler.java  |  24 +-
 .../handlers/JobVertexDetailsHandler.java       |  32 +-
 .../webmonitor/handlers/JsonFactory.java        |   7 +-
 .../SubtaskCurrentAttemptDetailsHandler.java    |  39 ++
 ...taskExecutionAttemptAccumulatorsHandler.java |  68 ++
 .../SubtaskExecutionAttemptDetailsHandler.java  | 101 +++
 .../SubtasksAllAccumulatorsHandler.java         |  83 +++
 .../handlers/SubtasksTimesHandler.java          |  47 +-
 .../legacy/JobManagerInfoHandler.java           | 702 -------------------
 .../runtime/webmonitor/legacy/JsonFactory.java  | 112 ---
 .../runtime/webmonitor/runner/TestRunner.java   | 199 ------
 .../StringifiedAccumulatorResult.java           |  31 +-
 .../flink/runtime/executiongraph/Execution.java |  14 +
 .../runtime/executiongraph/ExecutionVertex.java |   9 +
 20 files changed, 579 insertions(+), 1129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 0a05111..804e6da 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -90,24 +90,6 @@ under the License.
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
 		</dependency>
-
-		<!-- ===================================================
-								Testing
-			=================================================== -->
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java-examples</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
 		
 	</dependencies>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 8aa25dd..4633dcf 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -47,8 +47,11 @@ import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
+import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
-import org.apache.flink.runtime.webmonitor.legacy.JobManagerInfoHandler;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,14 +64,16 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 
 /**
- * The root component of the web runtime monitor.
- * 
- * <p>The web runtime monitor is based in Netty HTTP. It uses the Netty-Router library to route
+ * The root component of the web runtime monitor. This class starts the web server and creates
+ * all request handlers for the REST API.
+ * <p>
+ * The web runtime monitor is based in Netty HTTP. It uses the Netty-Router library to route
  * HTTP requests of different paths to different response handlers. In addition, it serves the static
- * files of the web frontend, such as HTML, CSS, or JS files.</p>
+ * files of the web frontend, such as HTML, CSS, or JS files.
  */
 public class WebRuntimeMonitor implements WebMonitor {
 
+	/** By default, all requests to the JobManager have a timeout of 10 seconds */ 
 	public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
 	
 	/** Logger for web frontend startup / shutdown messages */
@@ -146,18 +151,17 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
 
-//			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/:attempt", handler(null))
-//			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/:attempt/accumulators", handler(null))
+			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))
+			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs)))
+			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs)))
+			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)))
 
 			.GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs)))
 			.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
 			.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
 
-					// the handler for the legacy requests
-				.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
-
-						// this handler serves all the static contents
+			// this handler serves all the static contents
 			.GET("/:*", new StaticFileServerHandler(webRootDir));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index 01f48e4..d9b4e59 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -40,7 +40,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
 	
 	
 	@Override
-	public String handleRequest(Map<String, String> params) throws Exception {
+	public final String handleRequest(Map<String, String> params) throws Exception {
 		String jidString = params.get("jobid");
 		if (jidString == null) {
 			throw new RuntimeException("JobId parameter missing");
@@ -56,7 +56,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
 		
 		ExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid);
 		if (eg == null) {
-			throw new NotFoundException("Could not find execution graph for job " + jid);
+			throw new NotFoundException("Could not find job with id " + jid);
 		}
 		
 		return handleRequest(eg, params);

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
new file mode 100644
index 0000000..5b12907
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.util.Map;
+
+/**
+ * Base class for request handlers whose response depends on a specific job vertex (defined
+ * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter).  
+ */
+public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler {
+	
+	public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public final String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+		final String vidString = params.get("vertexid");
+		if (vidString == null) {
+			throw new IllegalArgumentException("vertexId parameter missing");
+		}
+
+		final JobVertexID vid;
+		try {
+			vid = JobVertexID.fromHexString(vidString);
+		}
+		catch (Exception e) {
+			throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
+		}
+
+		final ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
+		if (jobVertex == null) {
+			throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
+		}
+
+		return handleRequest(jobVertex, params);
+	}
+	
+	public abstract String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
new file mode 100644
index 0000000..672df16
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.util.Map;
+
+/**
+ * Base class for request handlers whose response depends on a specific subtask execution attempt
+ * (defined via the "attempt" parameter) of a specific subtask (defined via the
+ * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a
+ * specific job, defined via (defined voa the "jobid" parameter).  
+ */
+public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler {
+	
+	public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+	
+	@Override
+	public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception {
+		final String attemptNumberString = params.get("attempt");
+		if (attemptNumberString == null) {
+			throw new RuntimeException("Attempt number parameter missing");
+		}
+
+		final int attempt;
+		try {
+			attempt = Integer.parseInt(attemptNumberString);
+		}
+		catch (NumberFormatException e) {
+			throw new RuntimeException("Invalid attempt number parameter");
+		}
+		
+		final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
+		if (attempt == currentAttempt.getAttemptNumber()) {
+			return handleRequest(currentAttempt, params);
+		}
+		else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
+			Execution exec = vertex.getPriorExecutionAttempt(attempt);
+			return handleRequest(exec, params);
+		}
+		else {
+			throw new RuntimeException("Attempt does not exist: " + attempt);
+		}
+	}
+
+	public abstract String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
new file mode 100644
index 0000000..90866c6
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.util.Map;
+
+/**
+ * Base class for request handlers whose response depends on a specific subtask (defined via the
+ * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a
+ * specific job, defined via (defined voa the "jobid" parameter).  
+ */
+public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler {
+	
+	public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public final String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+		final String subtaskNumberString = params.get("subtasknum");
+		if (subtaskNumberString == null) {
+			throw new RuntimeException("Subtask number parameter missing");
+		}
+
+		final int subtask;
+		try {
+			subtask = Integer.parseInt(subtaskNumberString);
+		}
+		catch (NumberFormatException e) {
+			throw new RuntimeException("Invalid subtask number parameter");
+		}
+		
+		if (subtask < 0 || subtask >= jobVertex.getParallelism()) {
+			throw new RuntimeException("subtask does not exist: " + subtask); 
+		}
+		
+		final ExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
+		return handleRequest(vertex, params);
+	}
+
+	public abstract String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 8554a31..ed2c541 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -21,41 +21,21 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
 import java.util.Map;
 
 
-public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandler implements RequestHandler.JsonResponse {
 	
 	public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
-	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
-		String vidString = params.get("vertexid");
-		if (vidString == null) {
-			throw new IllegalArgumentException("vertexId parameter missing");
-		}
-
-		JobVertexID vid;
-		try {
-			vid = JobVertexID.fromHexString(vidString);
-		}
-		catch (Exception e) {
-			throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
-		}
-		
-		ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
-		if (jobVertex == null) {
-			throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
-		}
-
+	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
 		
 		StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 7815f49..21e2868 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -19,47 +19,31 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
 import java.util.Map;
 
-
-public class JobVertexDetailsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+/**
+ * A request handler that provides the details of a job vertex, including id, name, parallelism,
+ * and the runtime and metrics of all its subtasks.
+ */
+public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler implements RequestHandler.JsonResponse {
 	
 	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
-	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
-		String vidString = params.get("vertexid");
-		if (vidString == null) {
-			throw new IllegalArgumentException("vertexId parameter missing");
-		}
-
-		JobVertexID vid;
-		try {
-			vid = JobVertexID.fromHexString(vidString);
-		}
-		catch (Exception e) {
-			throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
-		}
-		
-		ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
-		if (jobVertex == null) {
-			throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
-		}
-
+	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		final long now = System.currentTimeMillis();
 		
 		StringWriter writer = new StringWriter();
@@ -123,6 +107,8 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphRequestHandle
 			gen.writeEndObject();
 			
 			gen.writeEndObject();
+			
+			num++;
 		}
 		gen.writeEndArray();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
index cb83361..e886532 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
@@ -18,9 +18,14 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-
+/**
+ * A holder for the singleton Jackson JSON factory. Since the Jackson's JSON factory object
+ * is a heavyweight object that is encouraged to be shared, we use a singleton instance across
+ * all request handlers.
+ */
 public class JsonFactory {
 
+	/** The singleton Jackson JSON factory. */
 	public static final com.fasterxml.jackson.core.JsonFactory jacksonFactory =
 			new com.fasterxml.jackson.core.JsonFactory();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
new file mode 100644
index 0000000..d301bd1
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.util.Map;
+
+/**
+ * Request handler providing details about a single task execution attempt.
+ */
+public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler {
+	
+	public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception {
+		return handleRequest(vertex.getCurrentExecutionAttempt(), params);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
new file mode 100644
index 0000000..f177b47
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Base class for request handlers whose response depends on a specific job vertex (defined
+ * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter).  
+ */
+public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptRequestHandler implements RequestHandler.JsonResponse {
+	
+	public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception {
+		final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
+		
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+		gen.writeStartObject();
+
+		gen.writeNumberField("subtask", execAttempt.getVertex().getSubTaskIndex());
+		gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
+		gen.writeStringField("id", execAttempt.getAttemptId().toShortString());
+		
+		gen.writeArrayFieldStart("user-accumulators");
+		for (StringifiedAccumulatorResult acc : accs) {
+			gen.writeStartObject();
+			gen.writeStringField("name", acc.getName());
+			gen.writeStringField("type", acc.getType());
+			gen.writeStringField("value", acc.getValue());
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+		
+		gen.writeEndObject();
+		
+		gen.close();
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
new file mode 100644
index 0000000..41aa929
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Request handler providing details about a single task execution attempt.
+ */
+public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptRequestHandler implements RequestHandler.JsonResponse {
+	
+	public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception {
+		final ExecutionState status = execAttempt.getState();
+		final long now = System.currentTimeMillis();
+
+		InstanceConnectionInfo location = execAttempt.getAssignedResourceLocation();
+		String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+		long startTime = execAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
+		if (startTime == 0) {
+			startTime = -1;
+		}
+		long endTime = status.isTerminal() ? execAttempt.getStateTimestamp(status) : -1;
+		long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
+
+		Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = execAttempt.getFlinkAccumulators();
+		LongCounter readBytes;
+		LongCounter writeBytes;
+		LongCounter readRecords;
+		LongCounter writeRecords;
+
+		if (metrics != null) {
+			readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
+			writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
+			readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
+			writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
+		}
+		else {
+			readBytes = null;
+			writeBytes = null;
+			readRecords = null;
+			writeRecords = null;
+		}
+
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+		gen.writeStartObject();
+		gen.writeNumberField("subtask", execAttempt.getVertex().getSubTaskIndex());
+		gen.writeStringField("status", status.name());
+		gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
+		gen.writeStringField("host", locationString);
+		gen.writeNumberField("start-time", startTime);
+		gen.writeNumberField("end-time", endTime);
+		gen.writeNumberField("duration", duration);
+
+		gen.writeObjectFieldStart("metrics");
+		gen.writeNumberField("read-bytes", readBytes != null ? readBytes.getLocalValuePrimitive() : -1L);
+		gen.writeNumberField("write-bytes", writeBytes != null ? writeBytes.getLocalValuePrimitive() : -1L);
+		gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L);
+		gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L);
+		gen.writeEndObject();
+
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
new file mode 100644
index 0000000..4f97292
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Request handler that returns the accumulators for all subtasks of job vertex.
+ */
+public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHandler implements RequestHandler.JsonResponse {
+	
+	public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+		gen.writeStartObject();
+		gen.writeStringField("id", jobVertex.getJobVertexId().toString());
+		gen.writeNumberField("parallelism", jobVertex.getParallelism());
+
+		gen.writeArrayFieldStart("subtasks");
+		
+		int num = 0;
+		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+
+			InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+			String locationString = location == null ? "(unassigned)" : location.getHostname();
+			
+			gen.writeStartObject();
+			
+			gen.writeNumberField("subtask", num++);
+			gen.writeNumberField("attempt", vertex.getCurrentExecutionAttempt().getAttemptNumber());
+			gen.writeStringField("host", locationString);
+
+			StringifiedAccumulatorResult[] accs = vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
+			gen.writeArrayFieldStart("user-accumulators");
+			for (StringifiedAccumulatorResult acc : accs) {
+				gen.writeStartObject();
+				gen.writeStringField("name", acc.getName());
+				gen.writeStringField("type", acc.getType());
+				gen.writeStringField("value", acc.getValue());
+				gen.writeEndObject();
+			}
+			gen.writeEndArray();
+			
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+
+		gen.writeEndObject();
+		gen.close();
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 8cd7b44..b6fa136 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -19,21 +19,21 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
 import java.util.Map;
 
 /**
- * Request handler that returns the JSON program plan of a job graph.
+ * Request handler that returns the state transition timestamps for all subtasks, plus their
+ * location and duration.
  */
-public class SubtasksTimesHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler implements RequestHandler.JsonResponse {
 
 	
 	public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
@@ -41,25 +41,8 @@ public class SubtasksTimesHandler extends AbstractExecutionGraphRequestHandler i
 	}
 
 	@Override
-	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
-		String vidString = params.get("vertexid");
-		if (vidString == null) {
-			throw new IllegalArgumentException("vertexId parameter missing");
-		}
-
-		JobVertexID vid;
-		try {
-			vid = JobVertexID.fromHexString(vidString);
-		}
-		catch (Exception e) {
-			throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
-		}
-
-		ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
-		if (jobVertex == null) {
-			throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
-		}
-
+	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+		final long now = System.currentTimeMillis();
 
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
@@ -68,25 +51,37 @@ public class SubtasksTimesHandler extends AbstractExecutionGraphRequestHandler i
 
 		gen.writeStringField("id", jobVertex.getJobVertexId().toString());
 		gen.writeStringField("name", jobVertex.getJobVertex().getName());
-		gen.writeNumberField("now", System.currentTimeMillis());
+		gen.writeNumberField("now", now);
 		
 		gen.writeArrayFieldStart("subtasks");
 
 		int num = 0;
 		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+			
+			long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
+			ExecutionState status = vertex.getExecutionState();
+
+			long scheduledTime = timestamps[ExecutionState.SCHEDULED.ordinal()];
+			
+			long start = scheduledTime > 0 ? scheduledTime : -1;
+			long end = status.isTerminal() ? timestamps[status.ordinal()] : now;
+			long duration = start >= 0 ? end - start : -1L;
+			
 			gen.writeStartObject();
-			gen.writeNumberField("subtask", num);
+			gen.writeNumberField("subtask", num++);
 
 			InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
 			String locationString = location == null ? "(unassigned)" : location.getHostname();
 			gen.writeStringField("host", locationString);
 
+			gen.writeNumberField("duration", duration);
+			
 			gen.writeObjectFieldStart("timestamps");
-			long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
 			for (ExecutionState state : ExecutionState.values()) {
 				gen.writeNumberField(state.name(), timestamps[state.ordinal()]);
 			}
 			gen.writeEndObject();
+			
 			gen.writeEndObject();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
deleted file mode 100644
index c65bc0f..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
+++ /dev/null
@@ -1,702 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.legacy;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpVersion;
-import io.netty.handler.codec.http.router.KeepAliveWrite;
-import io.netty.handler.codec.http.router.Routed;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.ArchiveMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultStringsFound;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsNotFound;
-import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResultsStringified;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Tuple3;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-@ChannelHandler.Sharable
-public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoHandler.class);
-
-	private static final Charset ENCODING = Charset.forName("UTF-8");
-
-	/** Underlying JobManager */
-	private final ActorGateway jobmanager;
-	private final ActorGateway archive;
-	private final FiniteDuration timeout;
-
-
-	public JobManagerInfoHandler(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) {
-		this.jobmanager = jobmanager;
-		this.archive = archive;
-		this.timeout = timeout;
-	}
-
-	@Override
-	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		DefaultFullHttpResponse response;
-		try {
-			String result = handleRequest(routed);
-			byte[] bytes = result.getBytes(ENCODING);
-
-			response = new DefaultFullHttpResponse(
-					HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
-
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
-		}
-		catch (Exception e) {
-			byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);
-			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
-					HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
-		}
-
-		response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
-		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
-		KeepAliveWrite.flush(ctx, routed.request(), response);
-	}
-
-
-	@SuppressWarnings("unchecked")
-	private String handleRequest(Routed routed) throws Exception {
-		if ("archive".equals(routed.queryParam("get"))) {
-			Future<Object> response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
-
-			Object result = Await.result(response, timeout);
-
-			if(!(result instanceof ArchiveMessages.ArchivedJobs)) {
-				throw new RuntimeException("RequestArchiveJobs requires a response of type " +
-						"ArchivedJobs. Instead the response is of type " + result.getClass() +".");
-			}
-			else {
-				final List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(
-						((ArchiveMessages.ArchivedJobs) result).asJavaCollection());
-
-				return writeJsonForArchive(archivedJobs);
-			}
-		}
-		else if ("jobcounts".equals(routed.queryParam("get"))) {
-			Future<Object> response = archive.ask(ArchiveMessages.getRequestJobCounts(), timeout);
-
-			Object result = Await.result(response, timeout);
-
-			if (!(result instanceof Tuple3)) {
-				throw new RuntimeException("RequestJobCounts requires a response of type " +
-						"Tuple3. Instead the response is of type " + result.getClass() +
-						".");
-			}
-			else {
-				return writeJsonForJobCounts((Tuple3<Integer, Integer, Integer>) result);
-			}
-		}
-		else if ("job".equals(routed.queryParam("get"))) {
-			String jobId = routed.queryParam("job");
-
-			Future<Object> response = archive.ask(new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
-					timeout);
-
-			Object result = Await.result(response, timeout);
-
-			if (!(result instanceof JobManagerMessages.JobResponse)){
-				throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
-						"Instead the response is of type " + result.getClass());
-			}
-			else {
-				final JobManagerMessages.JobResponse jobResponse = (JobManagerMessages.JobResponse) result;
-
-				if (jobResponse instanceof JobManagerMessages.JobFound){
-					ExecutionGraph archivedJob = ((JobManagerMessages.JobFound)result).executionGraph();
-					return writeJsonForArchivedJob(archivedJob);
-				}
-				else {
-					throw new Exception("DoGet:job: Could not find job for job ID " + jobId);
-				}
-			}
-		}
-		else if ("groupvertex".equals(routed.queryParam("get"))) {
-			String jobId = routed.queryParam("job");
-			String groupVertexId = routed.queryParam("groupvertex");
-
-			// No group vertex specified
-			if (groupVertexId.equals("null")) {
-				throw new Exception("Found null groupVertexId");
-			}
-
-			Future<Object> response = archive.ask(new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
-					timeout);
-
-			Object result = Await.result(response, timeout);
-
-			if (!(result instanceof JobManagerMessages.JobResponse)){
-				throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
-						"Instead the response is of type " + result.getClass());
-			}
-			else {
-				final JobManagerMessages.JobResponse jobResponse = (JobManagerMessages.JobResponse) result;
-
-				if (jobResponse instanceof JobManagerMessages.JobFound) {
-					ExecutionGraph archivedJob = ((JobManagerMessages.JobFound)jobResponse).executionGraph();
-
-					return writeJsonForArchivedJobGroupvertex(archivedJob, JobVertexID.fromHexString(groupVertexId));
-				}
-				else {
-					throw new Exception("DoGet:groupvertex: Could not find job for job ID " + jobId);
-				}
-			}
-		}
-		else if ("taskmanagers".equals(routed.queryParam("get"))) {
-			Future<Object> response = jobmanager.ask(
-					JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-					timeout);
-
-			Object result = Await.result(response, timeout);
-
-			if (!(result instanceof Integer)) {
-				throw new RuntimeException("RequestNumberRegisteredTaskManager requires a " +
-						"response of type Integer. Instead the response is of type " +
-						result.getClass() + ".");
-			}
-			else {
-				final int numberOfTaskManagers = (Integer)result;
-
-				final Future<Object> responseRegisteredSlots = jobmanager.ask(
-						JobManagerMessages.getRequestTotalNumberOfSlots(),
-						timeout);
-
-				final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
-						timeout);
-
-				if (!(resultRegisteredSlots instanceof Integer)) {
-					throw new RuntimeException("RequestTotalNumberOfSlots requires a response of " +
-							"type Integer. Instaed the response of type " +
-							resultRegisteredSlots.getClass() + ".");
-				}
-				else {
-					final int numberOfRegisteredSlots = (Integer) resultRegisteredSlots;
-
-					return "{\"taskmanagers\": " + numberOfTaskManagers + ", " +
-							"\"slots\": " + numberOfRegisteredSlots + "}";
-				}
-			}
-		}
-		else if ("cancel".equals(routed.queryParam("get"))) {
-			String jobId = routed.queryParam("job");
-
-			Future<Object> response = jobmanager.ask(new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)),
-					timeout);
-
-			Await.ready(response, timeout);
-			return "{}";
-		}
-		else if ("updates".equals(routed.queryParam("get"))) {
-			String jobId = routed.queryParam("job");
-			return writeJsonUpdatesForJob(JobID.fromHexString(jobId));
-		}
-		else if ("version".equals(routed.queryParam("get"))) {
-			return writeJsonForVersion();
-		}
-		else{
-			Future<Object> response = jobmanager.ask(JobManagerMessages.getRequestRunningJobs(),
-					timeout);
-
-			Object result = Await.result(response, timeout);
-
-			if(!(result instanceof JobManagerMessages.RunningJobs)){
-				throw new RuntimeException("RequestRunningJobs requires a response of type " +
-						"RunningJobs. Instead the response of type " + result.getClass() + ".");
-			}
-			else {
-				final Iterable<ExecutionGraph> runningJobs =
-						((JobManagerMessages.RunningJobs) result).asJavaIterable();
-
-				return writeJsonForJobs(runningJobs);
-			}
-		}
-	}
-
-	private String writeJsonForJobs(Iterable<ExecutionGraph> graphs) {
-		StringBuilder bld = new StringBuilder();
-		bld.append("[");
-
-		Iterator<ExecutionGraph> it = graphs.iterator();
-		// Loop Jobs
-		while(it.hasNext()){
-			ExecutionGraph graph = it.next();
-
-			writeJsonForJob(bld, graph);
-
-			//Write seperator between json objects
-			if(it.hasNext()) {
-				bld.append(",");
-			}
-		}
-		bld.append("]");
-
-		return bld.toString();
-	}
-
-	private void writeJsonForJob(StringBuilder bld, ExecutionGraph graph) {
-		//Serialize job to json
-		bld.append("{");
-		bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
-		bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
-		bld.append("\"status\": \"").append(graph.getState()).append("\",");
-		bld.append("\"time\": ").append(graph.getStatusTimestamp(graph.getState())).append(",");
-
-		// Serialize ManagementGraph to json
-		bld.append("\"groupvertices\": [");
-		boolean first = true;
-
-		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-			//Write seperator between json objects
-			if (first) {
-				first = false;
-			} else {
-				bld.append(",");
-			}
-			bld.append(JsonFactory.toJson(groupVertex));
-		}
-		bld.append("]");
-		bld.append("}");
-	}
-
-	private String writeJsonForArchive(List<ExecutionGraph> graphs) {
-		StringBuilder bld = new StringBuilder();
-		bld.append("[");
-
-		// sort jobs by time
-		Collections.sort(graphs, new Comparator<ExecutionGraph>() {
-			@Override
-			public int compare(ExecutionGraph o1, ExecutionGraph o2) {
-				if (o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) {
-					return 1;
-				} else {
-					return -1;
-				}
-			}
-
-		});
-
-		// Loop Jobs
-		for (int i = 0; i < graphs.size(); i++) {
-			ExecutionGraph graph = graphs.get(i);
-
-			//Serialize job to json
-			bld.append("{");
-			bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
-			bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
-			bld.append("\"status\": \"").append(graph.getState()).append("\",");
-			bld.append("\"time\": ").append(graph.getStatusTimestamp(graph.getState()));
-
-			bld.append("}");
-
-			//Write seperator between json objects
-			if(i != graphs.size() - 1) {
-				bld.append(",");
-			}
-		}
-		bld.append("]");
-		return bld.toString();
-	}
-
-	private String writeJsonForJobCounts(Tuple3<Integer, Integer, Integer> jobCounts) {
-		return "{\"finished\": " + jobCounts._1() + ",\"canceled\": " + jobCounts._2() + ",\"failed\": "
-				+ jobCounts._3() + "}";
-	}
-
-
-	private String writeJsonForArchivedJob(ExecutionGraph graph) {
-		StringBuilder bld = new StringBuilder();
-
-		bld.append("[");
-		bld.append("{");
-		bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
-		bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
-		bld.append("\"status\": \"").append(graph.getState()).append("\",");
-		bld.append("\"SCHEDULED\": ").append(graph.getStatusTimestamp(JobStatus.CREATED)).append(",");
-		bld.append("\"RUNNING\": ").append(graph.getStatusTimestamp(JobStatus.RUNNING)).append(",");
-		bld.append("\"FINISHED\": ").append(graph.getStatusTimestamp(JobStatus.FINISHED)).append(",");
-		bld.append("\"FAILED\": ").append(graph.getStatusTimestamp(JobStatus.FAILED)).append(",");
-		bld.append("\"CANCELED\": ").append(graph.getStatusTimestamp(JobStatus.CANCELED)).append(",");
-
-		if (graph.getState() == JobStatus.FAILED) {
-			bld.append("\"failednodes\": [");
-			boolean first = true;
-			for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
-				if (vertex.getExecutionState() == ExecutionState.FAILED) {
-					InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
-					Throwable failureCause = vertex.getFailureCause();
-					if (location != null || failureCause != null) {
-						if (first) {
-							first = false;
-						} else {
-							bld.append(",");
-						}
-						bld.append("{");
-						bld.append("\"node\": \"").append(location == null ? "(none)" : location.getFQDNHostname()).append("\",");
-						bld.append("\"message\": \"").append(failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))).append("\"");
-						bld.append("}");
-					}
-				}
-			}
-			bld.append("],");
-		}
-
-		// Serialize ManagementGraph to json
-		bld.append("\"groupvertices\": [");
-		boolean first = true;
-		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-			//Write seperator between json objects
-			if (first) {
-				first = false;
-			} else {
-				bld.append(",");
-			}
-
-			bld.append(JsonFactory.toJson(groupVertex));
-
-		}
-		bld.append("],");
-
-		// write user config
-		ExecutionConfig ec = graph.getExecutionConfig();
-		if(ec != null) {
-			bld.append("\"executionConfig\": {");
-			bld.append("\"Execution Mode\": \"").append(ec.getExecutionMode()).append("\",");
-			bld.append("\"Number of execution retries\": \"").append(ec.getNumberOfExecutionRetries()).append("\",");
-			bld.append("\"Job parallelism\": \"").append(ec.getParallelism()).append("\",");
-			bld.append("\"Object reuse mode\": \"").append(ec.isObjectReuseEnabled()).append("\"");
-			ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();
-			if(uc != null) {
-				Map<String, String> ucVals = uc.toMap();
-				if (ucVals != null) {
-					String ucString = "{";
-					int i = 0;
-					for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
-						ucString += "\"" + ucVal.getKey() + "\":\"" + ucVal.getValue() + "\"";
-						if (++i < ucVals.size()) {
-							ucString += ",\n";
-						}
-					}
-					bld.append(", \"userConfig\": ").append(ucString).append("}");
-				}
-				else {
-					LOG.debug("GlobalJobParameters.toMap() did not return anything");
-				}
-			}
-			else {
-				LOG.debug("No GlobalJobParameters were set in the execution config");
-			}
-			bld.append("},");
-		}
-		else {
-			LOG.warn("Unable to retrieve execution config from execution graph");
-		}
-
-		// write accumulators
-		final Future<Object> response = jobmanager.ask(
-				new RequestAccumulatorResultsStringified(graph.getJobID()),
-				timeout);
-
-		Object result;
-		try {
-			result = Await.result(response, timeout);
-		}
-		catch (Exception ex) {
-			throw new RuntimeException("Could not retrieve the accumulator results from the job manager.", ex);
-		}
-
-		if (result instanceof AccumulatorResultStringsFound) {
-			StringifiedAccumulatorResult[] accumulators = ((AccumulatorResultStringsFound) result).result();
-
-			bld.append("\n\"accumulators\": [");
-			int i = 0;
-			for (StringifiedAccumulatorResult accumulator : accumulators) {
-				bld.append("{ \"name\": \"").append(accumulator.getName()).append(" (").append(accumulator.getType()).append(")\",").append(" \"value\": \"").append(accumulator.getValue()).append("\"}\n");
-				if (++i < accumulators.length) {
-					bld.append(",");
-				}
-			}
-			bld.append("],\n");
-		}
-		else if (result instanceof AccumulatorResultsNotFound) {
-			bld.append("\n\"accumulators\": [],");
-		}
-		else if (result instanceof AccumulatorResultsErroneous) {
-			LOG.error("Could not obtain accumulators for job " + graph.getJobID(),
-					((AccumulatorResultsErroneous) result).cause());
-		}
-		else {
-			throw new RuntimeException("RequestAccumulatorResults requires a response of type " +
-					"AccumulatorResultStringsFound. Instead the response is of type " +
-					result.getClass() + ".");
-		}
-
-		bld.append("\"groupverticetimes\": {");
-		first = true;
-
-		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-			if (first) {
-				first = false;
-			} else {
-				bld.append(",");
-			}
-
-			// Calculate start and end time for groupvertex
-			long started = Long.MAX_VALUE;
-			long ended = 0;
-
-			// Take earliest running state and latest endstate of groupmembers
-			for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-
-				long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
-				if (running != 0 && running < started) {
-					started = running;
-				}
-
-				long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
-				long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
-				long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
-
-				if (finished != 0 && finished > ended) {
-					ended = finished;
-				}
-
-				if (canceled != 0 && canceled > ended) {
-					ended = canceled;
-				}
-
-				if (failed != 0 && failed > ended) {
-					ended = failed;
-				}
-
-			}
-
-			bld.append("\"").append(groupVertex.getJobVertexId()).append("\": {");
-			bld.append("\"groupvertexid\": \"").append(groupVertex.getJobVertexId()).append("\",");
-			bld.append("\"groupvertexname\": \"").append(groupVertex).append("\",");
-			bld.append("\"STARTED\": ").append(started).append(",");
-			bld.append("\"ENDED\": ").append(ended);
-			bld.append("}");
-
-		}
-
-		bld.append("}");
-		bld.append("}");
-		bld.append("]");
-
-		return bld.toString();
-	}
-
-
-	private String writeJsonUpdatesForJob(JobID jobId) {
-		final Future<Object> responseArchivedJobs = jobmanager.ask(
-				JobManagerMessages.getRequestRunningJobs(),
-				timeout);
-
-		Object resultArchivedJobs;
-		try{
-			resultArchivedJobs = Await.result(responseArchivedJobs, timeout);
-		}
-		catch (Exception ex) {
-			throw new RuntimeException("Could not retrieve archived jobs from the job manager.", ex);
-		}
-
-		if(!(resultArchivedJobs instanceof JobManagerMessages.RunningJobs)){
-			throw new RuntimeException("RequestArchivedJobs requires a response of type " +
-					"RunningJobs. Instead the response is of type " +
-					resultArchivedJobs.getClass() + ".");
-		}
-		else {
-			final Iterable<ExecutionGraph> graphs = ((JobManagerMessages.RunningJobs)resultArchivedJobs).
-					asJavaIterable();
-
-			//Serialize job to json
-			final StringBuilder bld = new StringBuilder();
-
-			bld.append("{");
-			bld.append("\"jobid\": \"").append(jobId).append("\",");
-			bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\",");
-			bld.append("\"recentjobs\": [");
-
-			boolean first = true;
-
-			for (ExecutionGraph g : graphs){
-				if (first) {
-					first = false;
-				} else {
-					bld.append(",");
-				}
-
-				bld.append("\"").append(g.getJobID()).append("\"");
-			}
-			bld.append("],");
-
-			final Future<Object> responseJob = jobmanager.ask(
-					new JobManagerMessages.RequestJob(jobId),
-					timeout);
-
-			Object resultJob;
-			try{
-				resultJob = Await.result(responseJob, timeout);
-			}
-			catch (Exception ex){
-				throw new RuntimeException("Could not retrieve the job with jobID " + jobId +
-						"from the job manager.", ex);
-			}
-
-			if (!(resultJob instanceof JobManagerMessages.JobResponse)) {
-				throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
-						"Instead the response is of type " + resultJob.getClass() + ".");
-			}
-			else {
-				final JobManagerMessages.JobResponse response = (JobManagerMessages.JobResponse) resultJob;
-
-				if (response instanceof JobManagerMessages.JobFound){
-					ExecutionGraph graph = ((JobManagerMessages.JobFound)response).executionGraph();
-
-					bld.append("\"vertexevents\": [");
-
-					first = true;
-					for (ExecutionVertex ev : graph.getAllExecutionVertices()) {
-						if (first) {
-							first = false;
-						} else {
-							bld.append(",");
-						}
-
-						bld.append("{");
-						bld.append("\"vertexid\": \"").append(ev.getCurrentExecutionAttempt().getAttemptId()).append("\",");
-						bld.append("\"newstate\": \"").append(ev.getExecutionState()).append("\",");
-						bld.append("\"timestamp\": \"").append(ev.getStateTimestamp(ev.getExecutionState())).append("\"");
-						bld.append("}");
-					}
-
-					bld.append("],");
-
-					bld.append("\"jobevents\": [");
-
-					bld.append("{");
-					bld.append("\"newstate\": \"").append(graph.getState()).append("\",");
-					bld.append("\"timestamp\": \"").append(graph.getStatusTimestamp(graph.getState())).append("\"");
-					bld.append("}");
-
-					bld.append("]");
-
-					bld.append("}");
-				}
-				else {
-					bld.append("\"vertexevents\": [],");
-					bld.append("\"jobevents\": [");
-					bld.append("{");
-					bld.append("\"newstate\": \"").append(JobStatus.FINISHED.toString()).append("\",");
-					bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\"");
-					bld.append("}");
-					bld.append("]");
-					bld.append("}");
-				}
-			}
-
-			return bld.toString();
-		}
-	}
-
-	private String writeJsonForArchivedJobGroupvertex(ExecutionGraph graph, JobVertexID vertexId) {
-		ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
-		StringBuilder bld = new StringBuilder();
-
-		bld.append("{\"groupvertex\": ").append(JsonFactory.toJson(jobVertex)).append(",");
-
-		bld.append("\"verticetimes\": {");
-		boolean first = true;
-		for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
-
-			for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-
-				Execution exec = vertex.getCurrentExecutionAttempt();
-
-				if(first) {
-					first = false;
-				} else {
-					bld.append(","); }
-
-				bld.append("\"").append(exec.getAttemptId()).append("\": {");
-				bld.append("\"vertexid\": \"").append(exec.getAttemptId()).append("\",");
-				bld.append("\"vertexname\": \"").append(vertex).append("\",");
-				bld.append("\"CREATED\": ").append(vertex.getStateTimestamp(ExecutionState.CREATED)).append(",");
-				bld.append("\"SCHEDULED\": ").append(vertex.getStateTimestamp(ExecutionState.SCHEDULED)).append(",");
-				bld.append("\"DEPLOYING\": ").append(vertex.getStateTimestamp(ExecutionState.DEPLOYING)).append(",");
-				bld.append("\"RUNNING\": ").append(vertex.getStateTimestamp(ExecutionState.RUNNING)).append(",");
-				bld.append("\"FINISHED\": ").append(vertex.getStateTimestamp(ExecutionState.FINISHED)).append(",");
-				bld.append("\"CANCELING\": ").append(vertex.getStateTimestamp(ExecutionState.CANCELING)).append(",");
-				bld.append("\"CANCELED\": ").append(vertex.getStateTimestamp(ExecutionState.CANCELED)).append(",");
-				bld.append("\"FAILED\": ").append(vertex.getStateTimestamp(ExecutionState.FAILED)).append("");
-				bld.append("}");
-			}
-
-		}
-		bld.append("}}");
-		return bld.toString();
-	}
-
-
-	private String writeJsonForVersion() {
-		return "{\"version\": \"" + EnvironmentInformation.getVersion() + "\",\"revision\": \"" +
-				EnvironmentInformation.getRevisionInformation().commitId + "\"}";
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
deleted file mode 100644
index fe18d3f..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.legacy;
-
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.util.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class JsonFactory {
-
-	public static String toJson(ExecutionVertex vertex) {
-		StringBuilder json = new StringBuilder("");
-		json.append("{");
-		json.append("\"vertexid\": \"").append(vertex.getCurrentExecutionAttempt().getAttemptId()).append("\",");
-		json.append("\"vertexname\": \"").append(StringUtils.escapeHtml(vertex.getSimpleName())).append("\",");
-		json.append("\"vertexstatus\": \"").append(vertex.getExecutionState()).append("\",");
-		
-		InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
-		String instanceName = location == null ? "(null)" : location.getFQDNHostname();
-		
-		json.append("\"vertexinstancename\": \"").append(instanceName).append("\"");
-		json.append("}");
-		return json.toString();
-	}
-	
-	public static String toJson(ExecutionJobVertex jobVertex) {
-		StringBuilder json = new StringBuilder("");
-		
-		json.append("{");
-		json.append("\"groupvertexid\": \"").append(jobVertex.getJobVertexId()).append("\",");
-		json.append("\"groupvertexname\": \"").append(StringUtils.escapeHtml(jobVertex.getJobVertex().getName())).append("\",");
-		json.append("\"numberofgroupmembers\": ").append(jobVertex.getParallelism()).append(",");
-		json.append("\"groupmembers\": [");
-		
-		// Count state status of group members
-		Map<ExecutionState, Integer> stateCounts = new HashMap<ExecutionState, Integer>();
-		
-		// initialize with 0
-		for (ExecutionState state : ExecutionState.values()) {
-			stateCounts.put(state, 0);
-		}
-		
-		ExecutionVertex[] vertices = jobVertex.getTaskVertices();
-		
-		for (int j = 0; j < vertices.length; j++) {
-			ExecutionVertex vertex = vertices[j];
-			
-			json.append(toJson(vertex));
-			
-			// print delimiter
-			if (j != vertices.length - 1) {
-				json.append(",");
-			}
-			
-			// Increment state status count
-			int count =  stateCounts.get(vertex.getExecutionState()) + 1;
-			stateCounts.put(vertex.getExecutionState(), count);
-		}
-		
-		json.append("],");
-		json.append("\"backwardEdges\": [");
-		
-		List<IntermediateResult> inputs = jobVertex.getInputs();
-		
-		for (int inputNumber = 0; inputNumber < inputs.size(); inputNumber++) {
-			ExecutionJobVertex input = inputs.get(inputNumber).getProducer();
-			
-			json.append("{");
-			json.append("\"groupvertexid\": \"").append(input.getJobVertexId()).append("\",");
-			json.append("\"groupvertexname\": \"").append(StringUtils.escapeHtml(jobVertex.getJobVertex().getName())).append("\"");
-			json.append("}");
-			
-			// print delimiter
-			if(inputNumber != inputs.size() - 1) {
-				json.append(",");
-			}
-		}
-		json.append("]");
-		
-		// list number of members for each status
-		for (Map.Entry<ExecutionState, Integer> stateCount : stateCounts.entrySet()) {
-			json.append(",\"").append(stateCount.getKey()).append("\": ").append(stateCount.getValue());
-		}
-		
-		json.append("}");
-		
-		return json.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
deleted file mode 100644
index a2978a1..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.runner;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.java.relational.util.WebLogData;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
-
-/**
- * Simple runner that brings up a local cluster with the web server and executes two
- * jobs to expose their data in the archive
- */
-@SuppressWarnings("serial")
-public class TestRunner {
-
-	public static void main(String[] args) throws Exception {
-
-		// start the cluster with the runtime monitor
-		Configuration configuration = new Configuration();
-		configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
-		configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
-		configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY,
-			"flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
-		
-		LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false);
-		cluster.start();
-
-		final int port = cluster.getLeaderRPCPort();
-		runWordCount(port);
-		runWebLogAnalysisExample(port);
-		runWordCount(port);
-
-		// block the thread
-		Object o = new Object();
-		synchronized (o) {
-			o.wait();
-		}
-		
-		cluster.shutdown();
-	}
-	
-	private static void runWordCount(int port) throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
-		
-		DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
-
-		DataSet<Tuple2<String, Integer>> counts =
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new Tokenizer())
-						// group by the tuple field "0" and sum up tuple field "1"
-						.groupBy(0)
-						.sum(1);
-
-		counts.print();
-	}
-	
-	private static void runWebLogAnalysisExample(int port) throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
-
-		// get input data
-		DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);
-		DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);
-		DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);
-
-		// Retain documents with keywords
-		DataSet<Tuple1<String>> filterDocs = documents
-				.filter(new FilterDocByKeyWords())
-				.project(0);
-
-		// Filter ranks by minimum rank
-		DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
-				.filter(new FilterByRank());
-
-		// Filter visits by visit date
-		DataSet<Tuple1<String>> filterVisits = visits
-				.filter(new FilterVisitsByDate())
-				.project(0);
-
-		// Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
-		DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
-				filterDocs.join(filterRanks)
-						.where(0).equalTo(1)
-						.projectSecond(0,1,2);
-
-		// Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
-		DataSet<Tuple3<Integer, String, Integer>> result =
-				joinDocsRanks.coGroup(filterVisits)
-						.where(1).equalTo(0)
-						.with(new AntiJoinVisits());
-
-		result.print();
-	}
-
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-	
-	public static class FilterDocByKeyWords implements FilterFunction<Tuple2<String, String>> {
-
-		private static final String[] KEYWORDS = { " editors ", " oscillations " };
-
-		@Override
-		public boolean filter(Tuple2<String, String> value) throws Exception {
-			// FILTER
-			// Only collect the document if all keywords are contained
-			String docText = value.f1;
-			for (String kw : KEYWORDS) {
-				if (!docText.contains(kw)) {
-					return false;
-				}
-			}
-			return true;
-		}
-	}
-
-	public static class FilterByRank implements FilterFunction<Tuple3<Integer, String, Integer>> {
-
-		private static final int RANKFILTER = 40;
-
-		@Override
-		public boolean filter(Tuple3<Integer, String, Integer> value) throws Exception {
-			return (value.f0 > RANKFILTER);
-		}
-	}
-
-
-	public static class FilterVisitsByDate implements FilterFunction<Tuple2<String, String>> {
-
-		private static final int YEARFILTER = 2007;
-
-		@Override
-		public boolean filter(Tuple2<String, String> value) throws Exception {
-			// Parse date string with the format YYYY-MM-DD and extract the year
-			String dateString = value.f1;
-			int year = Integer.parseInt(dateString.substring(0,4));
-			return (year == YEARFILTER);
-		}
-	}
-	
-	
-	@FunctionAnnotation.ForwardedFieldsFirst("*")
-	public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
-
-		@Override
-		public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
-			// Check if there is a entry in the visits relation
-			if (!visits.iterator().hasNext()) {
-				for (Tuple3<Integer, String, Integer> next : ranks) {
-					// Emit all rank pairs
-					out.collect(next);
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
index ea01acd..a0d1eda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
@@ -56,20 +56,25 @@ public class StringifiedAccumulatorResult implements java.io.Serializable{
 	// ------------------------------------------------------------------------
 
 	public static StringifiedAccumulatorResult[] stringifyAccumulatorResults(Map<String, Accumulator<?, ?>> accs) {
-		StringifiedAccumulatorResult[] results = new StringifiedAccumulatorResult[accs.size()];
-		
-		int i = 0;
-		for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
-			StringifiedAccumulatorResult result;
-			Accumulator<?, ?> value = entry.getValue();
-			if (value != null) {
-				result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString());
-			} else {
-				result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
+		if (accs == null || accs.isEmpty()) {
+			return new StringifiedAccumulatorResult[0];
+		}
+		else {
+			StringifiedAccumulatorResult[] results = new StringifiedAccumulatorResult[accs.size()];
+			
+			int i = 0;
+			for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
+				StringifiedAccumulatorResult result;
+				Accumulator<?, ?> value = entry.getValue();
+				if (value != null) {
+					result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString());
+				} else {
+					result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
+				}
+	
+				results[i++] = result;
 			}
-
-			results[i++] = result;
+			return results;
 		}
-		return results;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 4cee2f3..faabfb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,6 +23,7 @@ import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -959,6 +960,10 @@ public class Execution implements Serializable {
 		return vertex.getSimpleName() + " - execution #" + attemptNumber;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Accumulators
+	// ------------------------------------------------------------------------
+	
 	/**
 	 * Update accumulators (discarded when the Execution has already been terminated).
 	 * @param flinkAccumulators the flink internal accumulators
@@ -973,14 +978,23 @@ public class Execution implements Serializable {
 			}
 		}
 	}
+	
 	public Map<String, Accumulator<?, ?>> getUserAccumulators() {
 		return userAccumulators;
 	}
 
+	public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
+		return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
+	}
+
 	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
 		return flinkAccumulators;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Standard utilities
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public String toString() {
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 78e9804..0d039cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -229,6 +229,15 @@ public class ExecutionVertex implements Serializable {
 		return currentExecution.getAssignedResourceLocation();
 	}
 	
+	public Execution getPriorExecutionAttempt(int attemptNumber) {
+		if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
+			return priorExecutions.get(attemptNumber);
+		}
+		else {
+			throw new IllegalArgumentException("attempt does not exist");
+		}
+	}
+	
 	public ExecutionGraph getExecutionGraph() {
 		return this.jobVertex.getGraph();
 	}