You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:19:56 UTC

[24/51] [abbrv] flink git commit: [FLINK-2687] [monitoring API] Extend vertex requests with subtask data and accumulators

[FLINK-2687] [monitoring API] Extend vertex requests with subtask data and accumulators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99a351e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99a351e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99a351e4

Branch: refs/heads/master
Commit: 99a351e4ef3048d8beac9f287d67c6f3ce3680b0
Parents: ba31f83
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 21 18:19:24 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 17 14:21:51 2015 +0200

----------------------------------------------------------------------
 .../common/accumulators/AccumulatorHelper.java  |  11 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  15 ++-
 .../handlers/CurrentJobsOverviewHandler.java    |  11 +-
 .../handlers/DashboardConfigHandler.java        |   8 ++
 .../handlers/JobAccumulatorsHandler.java        |  65 +++++++++
 .../webmonitor/handlers/JobDetailsHandler.java  |  34 ++++-
 .../handlers/JobVertexAccumulatorsHandler.java  |  82 ++++++++++++
 .../handlers/JobVertexDetailsHandler.java       | 134 +++++++++++++++++++
 .../handlers/SubtasksTimesHandler.java          |   3 +-
 .../StringifiedAccumulatorResult.java           |  26 ++++
 .../runtime/executiongraph/ExecutionGraph.java  |  21 +--
 .../executiongraph/ExecutionJobVertex.java      |  67 ++++++----
 .../jobgraph/jsonplan/JsonPlanGenerator.java    |   1 -
 .../runtime/jobmanager/MemoryArchivist.scala    |   6 +-
 .../jobgraph/jsonplan/JsonGeneratorTest.java    |   2 -
 .../messages/WebMonitorMessagesTest.java        |   5 +-
 .../jsonplan/JsonJobGraphGenerationTest.java    |  28 ++--
 17 files changed, 431 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index bb48bdd..72670bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -58,7 +58,7 @@ public class AccumulatorHelper {
 	/**
 	 * Workaround method for type safety
 	 */
-	private static final <V, R extends Serializable> void mergeSingle(Accumulator<?, ?> target,
+	private static <V, R extends Serializable> void mergeSingle(Accumulator<?, ?> target,
 															Accumulator<?, ?> toMerge) {
 		@SuppressWarnings("unchecked")
 		Accumulator<V, R> typedTarget = (Accumulator<V, R>) target;
@@ -104,7 +104,7 @@ public class AccumulatorHelper {
 	public static Map<String, Object> toResultMap(Map<String, Accumulator<?, ?>> accumulators) {
 		Map<String, Object> resultMap = new HashMap<String, Object>();
 		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
-			resultMap.put(entry.getKey(), (Object) entry.getValue().getLocalValue());
+			resultMap.put(entry.getKey(), entry.getValue().getLocalValue());
 		}
 		return resultMap;
 	}
@@ -112,8 +112,8 @@ public class AccumulatorHelper {
 	public static String getResultsFormated(Map<String, Object> map) {
 		StringBuilder builder = new StringBuilder();
 		for (Map.Entry<String, Object> entry : map.entrySet()) {
-			builder.append("- " + entry.getKey() + " (" + entry.getValue().getClass().getName()
-					+ ")" + ": " + entry.getValue().toString() + "\n");
+			builder.append("- ").append(entry.getKey()).append(" (").append(entry.getValue().getClass().getName());
+			builder.append(")").append(": ").append(entry.getValue().toString()).append("\n");
 		}
 		return builder.toString();
 	}
@@ -127,8 +127,7 @@ public class AccumulatorHelper {
 		}
 	}
 
-	public static Map<String, Accumulator<?, ?>> copy(final Map<String, Accumulator<?,
-			?>> accumulators) {
+	public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) {
 		Map<String, Accumulator<?, ?>> result = new HashMap<String, Accumulator<?, ?>>();
 
 		for(Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 1c1b723..8aa25dd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -35,12 +35,15 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
 import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
@@ -139,16 +142,22 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs)))
 
+			.GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
+			.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
+
+//			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/:attempt", handler(null))
+//			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/:attempt/accumulators", handler(null))
 
 			.GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs)))
 			.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
 			.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
+			.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
 
-			// the handler for the legacy requests
-			.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
+					// the handler for the legacy requests
+				.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
 
-			// this handler serves all the static contents
+						// this handler serves all the static contents
 			.GET("/:*", new StaticFileServerHandler(webRootDir));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index d792ff4..6444e4b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -61,6 +61,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
 			
 			MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
 
+			final long now = System.currentTimeMillis();
 
 			StringWriter writer = new StringWriter();
 			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
@@ -70,20 +71,20 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
 			if (includeRunningJobs && includeFinishedJobs) {
 				gen.writeArrayFieldStart("running");
 				for (JobDetails detail : result.getRunningJobs()) {
-					generateSingleJobDetails(detail, gen);
+					generateSingleJobDetails(detail, gen, now);
 				}
 				gen.writeEndArray();
 
 				gen.writeArrayFieldStart("finished");
 				for (JobDetails detail : result.getFinishedJobs()) {
-					generateSingleJobDetails(detail, gen);
+					generateSingleJobDetails(detail, gen, now);
 				}
 				gen.writeEndArray();
 			}
 			else {
 				gen.writeArrayFieldStart("jobs");
 				for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
-					generateSingleJobDetails(detail, gen);
+					generateSingleJobDetails(detail, gen, now);
 				}
 				gen.writeEndArray();
 			}
@@ -97,7 +98,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
 		}
 	}
 
-	private static void generateSingleJobDetails(JobDetails details, JsonGenerator gen) throws Exception {
+	private static void generateSingleJobDetails(JobDetails details, JsonGenerator gen, long now) throws Exception {
 		gen.writeStartObject();
 
 		gen.writeStringField("jid", details.getJobId().toString());
@@ -106,7 +107,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
 
 		gen.writeNumberField("start-time", details.getStartTime());
 		gen.writeNumberField("end-time", details.getEndTime());
-		gen.writeNumberField("duration", details.getEndTime() <= 0 ? -1L : details.getEndTime() - details.getStartTime());
+		gen.writeNumberField("duration", (details.getEndTime() <= 0 ? now : details.getEndTime()) - details.getStartTime());
 		gen.writeNumberField("last-modification", details.getLastUpdateTime());
 
 		gen.writeObjectFieldStart("tasks");

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index c21c35d..ad72f0a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import java.io.StringWriter;
 import java.util.Map;
@@ -46,6 +47,13 @@ public class DashboardConfigHandler implements RequestHandler, RequestHandler.Js
 			gen.writeNumberField("refresh-interval", refreshInterval);
 			gen.writeNumberField("timezone-offset", timeZoneOffset);
 			gen.writeStringField("timezone-name", timeZoneName);
+			gen.writeStringField("flink-version", EnvironmentInformation.getVersion());
+
+			EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
+			if (revision != null) {
+				gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
+			}
+
 			gen.writeEndObject();
 	
 			gen.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
new file mode 100644
index 0000000..2732497
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Request handler that returns the aggregated user accumulators of a job.
+ */
+public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+	
+	public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+		StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
+		
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+		gen.writeStartObject();
+
+		gen.writeArrayFieldStart("job-accumulators");
+		// empty for now
+		gen.writeEndArray();
+		
+		gen.writeArrayFieldStart("user-task-accumulators");
+		for (StringifiedAccumulatorResult acc : allAccumulators) {
+			gen.writeStartObject();
+			gen.writeStringField("name", acc.getName());
+			gen.writeStringField("type", acc.getType());
+			gen.writeStringField("value", acc.getValue());
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index feca11b..25b0f19 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -34,7 +34,14 @@ import java.io.StringWriter;
 import java.util.Map;
 
 /**
- * Request handler that returns the JSON program plan of a job graph.
+ * Request handler that returns details about a job, including:
+ * <ul>
+ *     <li>Dataflow plan</li>
+ *     <li>id, name, and current status</li>
+ *     <li>start time, end time, duration</li>
+ *     <li>number of job vertices in each state (pending, running, finished, failed)</li>
+ *     <li>info about job vertices, including runtime, status, I/O bytes and records, subtasks in each status</li>
+ * </ul>
  */
 public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
 	
@@ -63,6 +70,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
 		gen.writeNumberField("start-time", jobStartTime);
 		gen.writeNumberField("end-time", jobEndTime);
 		gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime);
+		gen.writeNumberField("now", now);
 		
 		// timestamps
 		gen.writeObjectFieldStart("timestamps");
@@ -71,9 +79,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
 		}
 		gen.writeEndObject();
 		
-		final int[] tasksPerStatusTotal = new int[ExecutionState.values().length];
-		
 		// job vertices
+		int[] jobVerticesPerState = new int[ExecutionState.values().length];
 		gen.writeArrayFieldStart("vertices");
 
 		for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
@@ -85,7 +92,6 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
 			for (ExecutionVertex vertex : ejv.getTaskVertices()) {
 				final ExecutionState state = vertex.getExecutionState();
 				tasksPerState[state.ordinal()]++;
-				tasksPerStatusTotal[state.ordinal()]++;
 
 				// take the earliest start time
 				long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
@@ -112,7 +118,11 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
 				endTime = -1L;
 				duration = -1L;
 			}
-
+			
+			ExecutionState jobVertexState = 
+					ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());
+			jobVerticesPerState[jobVertexState.ordinal()]++;
+			
 			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = ejv.getAggregatedMetricAccumulators();
 
 			LongCounter readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
@@ -124,7 +134,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
 			gen.writeStringField("id", ejv.getJobVertexId().toString());
 			gen.writeStringField("name", ejv.getJobVertex().getName());
 			gen.writeNumberField("parallelism", ejv.getParallelism());
-			
+			gen.writeStringField("status", jobVertexState.name());
+
 			gen.writeNumberField("start-time", startTime);
 			gen.writeNumberField("end-time", endTime);
 			gen.writeNumberField("duration", duration);
@@ -141,11 +152,20 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
 			gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L);
 			gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L);
 			gen.writeEndObject();
-
+			
 			gen.writeEndObject();
 		}
 		gen.writeEndArray();
 
+		gen.writeObjectFieldStart("status-counts");
+		for (ExecutionState state : ExecutionState.values()) {
+			gen.writeNumberField(state.name(), jobVerticesPerState[state.ordinal()]);
+		}
+		gen.writeEndObject();
+
+		gen.writeFieldName("plan");
+		gen.writeRawValue(graph.getJsonPlan());
+
 		gen.writeEndObject();
 
 		gen.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
new file mode 100644
index 0000000..8554a31
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+
+public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+	
+	public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+		String vidString = params.get("vertexid");
+		if (vidString == null) {
+			throw new IllegalArgumentException("vertexId parameter missing");
+		}
+
+		JobVertexID vid;
+		try {
+			vid = JobVertexID.fromHexString(vidString);
+		}
+		catch (Exception e) {
+			throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
+		}
+		
+		ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
+		if (jobVertex == null) {
+			throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
+		}
+
+		StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
+		
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+		gen.writeStartObject();
+		gen.writeStringField("id", jobVertex.getJobVertexId().toString());
+		
+		gen.writeArrayFieldStart("user-accumulators");
+		for (StringifiedAccumulatorResult acc : accs) {
+			gen.writeStartObject();
+			gen.writeStringField("name", acc.getName());
+			gen.writeStringField("type", acc.getType());
+			gen.writeStringField("value", acc.getValue());
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+		
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
new file mode 100644
index 0000000..7815f49
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+
+public class JobVertexDetailsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+	
+	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+		String vidString = params.get("vertexid");
+		if (vidString == null) {
+			throw new IllegalArgumentException("vertexId parameter missing");
+		}
+
+		JobVertexID vid;
+		try {
+			vid = JobVertexID.fromHexString(vidString);
+		}
+		catch (Exception e) {
+			throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
+		}
+		
+		ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
+		if (jobVertex == null) {
+			throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
+		}
+
+		final long now = System.currentTimeMillis();
+		
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+		gen.writeStartObject();
+
+		gen.writeStringField("id", jobVertex.getJobVertexId().toString());
+		gen.writeStringField("name", jobVertex.getJobVertex().getName());
+		gen.writeNumberField("parallelism", jobVertex.getParallelism());
+		gen.writeNumberField("now", now);
+
+		gen.writeArrayFieldStart("subtasks");
+		int num = 0;
+		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+			final ExecutionState status = vertex.getExecutionState();
+			
+			InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+			String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+			long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+			if (startTime == 0) {
+				startTime = -1;
+			}
+			long endTime = status.isTerminal() ? vertex.getStateTimestamp(status) : -1;
+			long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
+			
+			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
+			LongCounter readBytes;
+			LongCounter writeBytes;
+			LongCounter readRecords;
+			LongCounter writeRecords;
+			
+			if (metrics != null) {
+				readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
+				writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
+				readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
+				writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
+			}
+			else {
+				readBytes = null;
+				writeBytes = null;
+				readRecords = null;
+				writeRecords = null;
+			}
+			
+			gen.writeStartObject();
+			gen.writeNumberField("subtask", num);
+			gen.writeStringField("status", status.name());
+			gen.writeNumberField("attempt", vertex.getCurrentExecutionAttempt().getAttemptNumber());
+			gen.writeStringField("host", locationString);
+			gen.writeNumberField("start-time", startTime);
+			gen.writeNumberField("end-time", endTime);
+			gen.writeNumberField("duration", duration);
+
+			gen.writeObjectFieldStart("metrics");
+			gen.writeNumberField("read-bytes", readBytes != null ? readBytes.getLocalValuePrimitive() : -1L);
+			gen.writeNumberField("write-bytes", writeBytes != null ? writeBytes.getLocalValuePrimitive() : -1L);
+			gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L);
+			gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L);
+			gen.writeEndObject();
+			
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+		
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 4d268ef..8cd7b44 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -68,7 +68,8 @@ public class SubtasksTimesHandler extends AbstractExecutionGraphRequestHandler i
 
 		gen.writeStringField("id", jobVertex.getJobVertexId().toString());
 		gen.writeStringField("name", jobVertex.getJobVertex().getName());
-
+		gen.writeNumberField("now", System.currentTimeMillis());
+		
 		gen.writeArrayFieldStart("subtasks");
 
 		int num = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
index 3b25fe0..ea01acd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.runtime.accumulators;
 
+import org.apache.flink.api.common.accumulators.Accumulator;
+
+import java.util.Map;
+
 /**
  * Container class that transports the result of an accumulator as set of strings.
  */
@@ -46,4 +50,26 @@ public class StringifiedAccumulatorResult implements java.io.Serializable{
 	public String getValue() {
 		return value;
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	public static StringifiedAccumulatorResult[] stringifyAccumulatorResults(Map<String, Accumulator<?, ?>> accs) {
+		StringifiedAccumulatorResult[] results = new StringifiedAccumulatorResult[accs.size()];
+		
+		int i = 0;
+		for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
+			StringifiedAccumulatorResult result;
+			Accumulator<?, ?> value = entry.getValue();
+			if (value != null) {
+				result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString());
+			} else {
+				result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
+			}
+
+			results[i++] = result;
+		}
+		return results;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 47175a9..70a49fd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -584,27 +584,8 @@ public class ExecutionGraph implements Serializable {
 	 * @return an Array containing the StringifiedAccumulatorResult objects
 	 */
 	public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
-
 		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
-
-		int num = accumulatorMap.size();
-		StringifiedAccumulatorResult[] resultStrings = new StringifiedAccumulatorResult[num];
-
-		int i = 0;
-		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
-
-			StringifiedAccumulatorResult result;
-			Accumulator<?, ?> value = entry.getValue();
-			if (value != null) {
-				result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString());
-			} else {
-				result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
-			}
-
-			resultStrings[i++] = result;
-		}
-
-		return resultStrings;
+		return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 2d6af6f..53d94d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.io.StrictlyLocalAssignment;
 import org.apache.flink.core.io.InputSplit;
@@ -27,6 +28,7 @@ import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -218,33 +220,12 @@ public class ExecutionJobVertex implements Serializable {
 	}
 	
 	public ExecutionState getAggregateState() {
-		
 		int[] num = new int[ExecutionState.values().length];
-		
 		for (ExecutionVertex vertex : this.taskVertices) {
 			num[vertex.getExecutionState().ordinal()]++;
 		}
-
-		if (num[ExecutionState.FAILED.ordinal()] > 0) {
-			return ExecutionState.FAILED;
-		}
-		if (num[ExecutionState.CANCELING.ordinal()] > 0) {
-			return ExecutionState.CANCELING;
-		}
-		else if (num[ExecutionState.CANCELED.ordinal()] > 0) {
-			return ExecutionState.CANCELED;
-		}
-		else if (num[ExecutionState.RUNNING.ordinal()] > 0) {
-			return ExecutionState.RUNNING;
-		}
-		else if (num[ExecutionState.FINISHED.ordinal()] > 0) {
-			return num[ExecutionState.FINISHED.ordinal()] == parallelism ?
-					ExecutionState.FINISHED : ExecutionState.RUNNING;
-		}
-		else {
-			// all else collapses under created
-			return ExecutionState.CREATED;
-		}
+		
+		return getAggregateJobVertexState(num, parallelism);
 	}
 	
 	//---------------------------------------------------------------------------------------------
@@ -528,6 +509,19 @@ public class ExecutionJobVertex implements Serializable {
 		return agg;
 	}
 
+	public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
+		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>();
+
+		for (ExecutionVertex vertex : taskVertices) {
+			Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
+			if (next != null) {
+				AccumulatorHelper.mergeInto(userAccumulators, next);
+			}
+		}
+
+		return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Static / pre-assigned input splits
 	// --------------------------------------------------------------------------------------------
@@ -665,4 +659,31 @@ public class ExecutionJobVertex implements Serializable {
 			}
 		}
 	}
+
+	public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
+		if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {
+			throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
+		}
+
+		if (verticesPerState[ExecutionState.FAILED.ordinal()] > 0) {
+			return ExecutionState.FAILED;
+		}
+		if (verticesPerState[ExecutionState.CANCELING.ordinal()] > 0) {
+			return ExecutionState.CANCELING;
+		}
+		else if (verticesPerState[ExecutionState.CANCELED.ordinal()] > 0) {
+			return ExecutionState.CANCELED;
+		}
+		else if (verticesPerState[ExecutionState.RUNNING.ordinal()] > 0) {
+			return ExecutionState.RUNNING;
+		}
+		else if (verticesPerState[ExecutionState.FINISHED.ordinal()] > 0) {
+			return verticesPerState[ExecutionState.FINISHED.ordinal()] == parallelism ?
+					ExecutionState.FINISHED : ExecutionState.RUNNING;
+		}
+		else {
+			// all else collapses under created
+			return ExecutionState.CREATED;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
index 0844dab..bd5357e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
@@ -41,7 +41,6 @@ public class JsonPlanGenerator {
 
 			final JsonFactory factory = new JsonFactory();
 			final JsonGenerator gen = factory.createGenerator(writer);
-			gen.useDefaultPrettyPrinter();
 			
 			// start of everything
 			gen.writeStartObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 55a7ccb..d3fc8b1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -157,7 +157,7 @@ class MemoryArchivist(private val max_entries: Int)
     message match {
       case _ : RequestJobsOverview =>
         try {
-          sender ! createJobsOverview()
+          sender ! decorateMessage(createJobsOverview())
         }
         catch {
           case t: Throwable => log.error("Exception while creating the jobs overview", t)
@@ -165,7 +165,7 @@ class MemoryArchivist(private val max_entries: Int)
   
       case _ : RequestJobsWithIDsOverview =>
         try {
-          sender ! createJobsWithIDsOverview()
+          sender ! decorateMessage(createJobsWithIDsOverview())
         }
         catch {
           case t: Throwable => log.error("Exception while creating the jobs overview", t)
@@ -176,7 +176,7 @@ class MemoryArchivist(private val max_entries: Int)
           v => WebMonitorUtils.createDetailsForJob(v)
         }.toArray[JobDetails]
         
-        theSender ! new MultipleJobsDetails(null, details)
+        theSender ! decorateMessage(new MultipleJobsDetails(null, details))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index 1ed46f2..46fb694 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -71,8 +71,6 @@ public class JsonGeneratorTest {
 			
 			String plan = JsonPlanGenerator.generatePlan(jg);
 			assertNotNull(plan);
-			
-			System.out.println(plan);
 
 			// validate the produced JSON
 			ObjectMapper m = new ObjectMapper();

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index b70853e..8d281f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
@@ -47,9 +48,9 @@ public class WebMonitorMessagesTest {
 			GenericMessageTester.testMessageInstance(RequestJobsOverview.getInstance());
 			GenericMessageTester.testMessageInstance(RequestJobsWithIDsOverview.getInstance());
 			GenericMessageTester.testMessageInstance(RequestStatusOverview.getInstance());
+			GenericMessageTester.testMessageInstance(RequestJobsOverview.getInstance());
 
-			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(RequestJobsOverview.class, rnd));
-			
+			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(RequestJobDetails.class, rnd));
 			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(StatusOverview.class, rnd));
 			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(JobsOverview.class, rnd));
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index c552700..121bc88 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -58,19 +58,19 @@ public class JsonJobGraphGenerationTest {
 	private PrintStream out;
 	private PrintStream err;
 
-//	@Before
-//	public void redirectStreams() {
-//		this.out = System.out;
-//		this.err = System.err;
-//
-//		OutputStream discards = new OutputStream() {
-//			@Override
-//			public void write(int b) {}
-//		};
-//		
-//		System.setOut(new PrintStream(discards));
-//		System.setErr(new PrintStream(discards));
-//	}
+	@Before
+	public void redirectStreams() {
+		this.out = System.out;
+		this.err = System.err;
+
+		OutputStream discards = new OutputStream() {
+			@Override
+			public void write(int b) {}
+		};
+		
+		System.setOut(new PrintStream(discards));
+		System.setErr(new PrintStream(discards));
+	}
 	
 	@After
 	public void restoreStreams() {
@@ -228,8 +228,6 @@ public class JsonJobGraphGenerationTest {
 
 		@Override
 		public void validtateJson(String json) throws Exception {
-			System.out.println(json);
-			
 			final Map<String, JsonNode> idToNode = new HashMap<>();
 			
 			// validate the produced JSON