You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:19:56 UTC
[24/51] [abbrv] flink git commit: [FLINK-2687] [monitoring API]
Extend vertex requests with subtask data and accumulators
[FLINK-2687] [monitoring API] Extend vertex requests with subtask data and accumulators
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99a351e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99a351e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99a351e4
Branch: refs/heads/master
Commit: 99a351e4ef3048d8beac9f287d67c6f3ce3680b0
Parents: ba31f83
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 21 18:19:24 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 17 14:21:51 2015 +0200
----------------------------------------------------------------------
.../common/accumulators/AccumulatorHelper.java | 11 +-
.../runtime/webmonitor/WebRuntimeMonitor.java | 15 ++-
.../handlers/CurrentJobsOverviewHandler.java | 11 +-
.../handlers/DashboardConfigHandler.java | 8 ++
.../handlers/JobAccumulatorsHandler.java | 65 +++++++++
.../webmonitor/handlers/JobDetailsHandler.java | 34 ++++-
.../handlers/JobVertexAccumulatorsHandler.java | 82 ++++++++++++
.../handlers/JobVertexDetailsHandler.java | 134 +++++++++++++++++++
.../handlers/SubtasksTimesHandler.java | 3 +-
.../StringifiedAccumulatorResult.java | 26 ++++
.../runtime/executiongraph/ExecutionGraph.java | 21 +--
.../executiongraph/ExecutionJobVertex.java | 67 ++++++----
.../jobgraph/jsonplan/JsonPlanGenerator.java | 1 -
.../runtime/jobmanager/MemoryArchivist.scala | 6 +-
.../jobgraph/jsonplan/JsonGeneratorTest.java | 2 -
.../messages/WebMonitorMessagesTest.java | 5 +-
.../jsonplan/JsonJobGraphGenerationTest.java | 28 ++--
17 files changed, 431 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index bb48bdd..72670bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -58,7 +58,7 @@ public class AccumulatorHelper {
/**
* Workaround method for type safety
*/
- private static final <V, R extends Serializable> void mergeSingle(Accumulator<?, ?> target,
+ private static <V, R extends Serializable> void mergeSingle(Accumulator<?, ?> target,
Accumulator<?, ?> toMerge) {
@SuppressWarnings("unchecked")
Accumulator<V, R> typedTarget = (Accumulator<V, R>) target;
@@ -104,7 +104,7 @@ public class AccumulatorHelper {
public static Map<String, Object> toResultMap(Map<String, Accumulator<?, ?>> accumulators) {
Map<String, Object> resultMap = new HashMap<String, Object>();
for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
- resultMap.put(entry.getKey(), (Object) entry.getValue().getLocalValue());
+ resultMap.put(entry.getKey(), entry.getValue().getLocalValue());
}
return resultMap;
}
@@ -112,8 +112,8 @@ public class AccumulatorHelper {
public static String getResultsFormated(Map<String, Object> map) {
StringBuilder builder = new StringBuilder();
for (Map.Entry<String, Object> entry : map.entrySet()) {
- builder.append("- " + entry.getKey() + " (" + entry.getValue().getClass().getName()
- + ")" + ": " + entry.getValue().toString() + "\n");
+ builder.append("- ").append(entry.getKey()).append(" (").append(entry.getValue().getClass().getName());
+ builder.append(")").append(": ").append(entry.getValue().toString()).append("\n");
}
return builder.toString();
}
@@ -127,8 +127,7 @@ public class AccumulatorHelper {
}
}
- public static Map<String, Accumulator<?, ?>> copy(final Map<String, Accumulator<?,
- ?>> accumulators) {
+ public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) {
Map<String, Accumulator<?, ?>> result = new HashMap<String, Accumulator<?, ?>>();
for(Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 1c1b723..8aa25dd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -35,12 +35,15 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
@@ -139,16 +142,22 @@ public class WebRuntimeMonitor implements WebMonitor {
.GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs)))
+ .GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
+ .GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
+
+// .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/:attempt", handler(null))
+// .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/:attempt/accumulators", handler(null))
.GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs)))
.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
+ .GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
- // the handler for the legacy requests
- .GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
+ // the handler for the legacy requests
+ .GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
- // this handler serves all the static contents
+ // this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(webRootDir));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index d792ff4..6444e4b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -61,6 +61,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
+ final long now = System.currentTimeMillis();
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
@@ -70,20 +71,20 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
if (includeRunningJobs && includeFinishedJobs) {
gen.writeArrayFieldStart("running");
for (JobDetails detail : result.getRunningJobs()) {
- generateSingleJobDetails(detail, gen);
+ generateSingleJobDetails(detail, gen, now);
}
gen.writeEndArray();
gen.writeArrayFieldStart("finished");
for (JobDetails detail : result.getFinishedJobs()) {
- generateSingleJobDetails(detail, gen);
+ generateSingleJobDetails(detail, gen, now);
}
gen.writeEndArray();
}
else {
gen.writeArrayFieldStart("jobs");
for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
- generateSingleJobDetails(detail, gen);
+ generateSingleJobDetails(detail, gen, now);
}
gen.writeEndArray();
}
@@ -97,7 +98,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
}
}
- private static void generateSingleJobDetails(JobDetails details, JsonGenerator gen) throws Exception {
+ private static void generateSingleJobDetails(JobDetails details, JsonGenerator gen, long now) throws Exception {
gen.writeStartObject();
gen.writeStringField("jid", details.getJobId().toString());
@@ -106,7 +107,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
gen.writeNumberField("start-time", details.getStartTime());
gen.writeNumberField("end-time", details.getEndTime());
- gen.writeNumberField("duration", details.getEndTime() <= 0 ? -1L : details.getEndTime() - details.getStartTime());
+ gen.writeNumberField("duration", (details.getEndTime() <= 0 ? now : details.getEndTime()) - details.getStartTime());
gen.writeNumberField("last-modification", details.getLastUpdateTime());
gen.writeObjectFieldStart("tasks");
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index c21c35d..ad72f0a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.util.EnvironmentInformation;
import java.io.StringWriter;
import java.util.Map;
@@ -46,6 +47,13 @@ public class DashboardConfigHandler implements RequestHandler, RequestHandler.Js
gen.writeNumberField("refresh-interval", refreshInterval);
gen.writeNumberField("timezone-offset", timeZoneOffset);
gen.writeStringField("timezone-name", timeZoneName);
+ gen.writeStringField("flink-version", EnvironmentInformation.getVersion());
+
+ EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
+ if (revision != null) {
+ gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
+ }
+
gen.writeEndObject();
gen.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
new file mode 100644
index 0000000..2732497
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Request handler that returns the aggregated user accumulators of a job.
+ */
+public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+
+ public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
+
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+ gen.writeStartObject();
+
+ gen.writeArrayFieldStart("job-accumulators");
+ // empty for now
+ gen.writeEndArray();
+
+ gen.writeArrayFieldStart("user-task-accumulators");
+ for (StringifiedAccumulatorResult acc : allAccumulators) {
+ gen.writeStartObject();
+ gen.writeStringField("name", acc.getName());
+ gen.writeStringField("type", acc.getType());
+ gen.writeStringField("value", acc.getValue());
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index feca11b..25b0f19 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -34,7 +34,14 @@ import java.io.StringWriter;
import java.util.Map;
/**
- * Request handler that returns the JSON program plan of a job graph.
+ * Request handler that returns details about a job, including:
+ * <ul>
+ * <li>Dataflow plan</li>
+ * <li>id, name, and current status</li>
+ * <li>start time, end time, duration</li>
+ * <li>number of job vertices in each state (pending, running, finished, failed)</li>
+ * <li>info about job vertices, including runtime, status, I/O bytes and records, subtasks in each status</li>
+ * </ul>
*/
public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
@@ -63,6 +70,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
gen.writeNumberField("start-time", jobStartTime);
gen.writeNumberField("end-time", jobEndTime);
gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime);
+ gen.writeNumberField("now", now);
// timestamps
gen.writeObjectFieldStart("timestamps");
@@ -71,9 +79,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
}
gen.writeEndObject();
- final int[] tasksPerStatusTotal = new int[ExecutionState.values().length];
-
// job vertices
+ int[] jobVerticesPerState = new int[ExecutionState.values().length];
gen.writeArrayFieldStart("vertices");
for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
@@ -85,7 +92,6 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
for (ExecutionVertex vertex : ejv.getTaskVertices()) {
final ExecutionState state = vertex.getExecutionState();
tasksPerState[state.ordinal()]++;
- tasksPerStatusTotal[state.ordinal()]++;
// take the earliest start time
long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
@@ -112,7 +118,11 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
endTime = -1L;
duration = -1L;
}
-
+
+ ExecutionState jobVertexState =
+ ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());
+ jobVerticesPerState[jobVertexState.ordinal()]++;
+
Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = ejv.getAggregatedMetricAccumulators();
LongCounter readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
@@ -124,7 +134,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
gen.writeStringField("id", ejv.getJobVertexId().toString());
gen.writeStringField("name", ejv.getJobVertex().getName());
gen.writeNumberField("parallelism", ejv.getParallelism());
-
+ gen.writeStringField("status", jobVertexState.name());
+
gen.writeNumberField("start-time", startTime);
gen.writeNumberField("end-time", endTime);
gen.writeNumberField("duration", duration);
@@ -141,11 +152,20 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler impl
gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L);
gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L);
gen.writeEndObject();
-
+
gen.writeEndObject();
}
gen.writeEndArray();
+ gen.writeObjectFieldStart("status-counts");
+ for (ExecutionState state : ExecutionState.values()) {
+ gen.writeNumberField(state.name(), jobVerticesPerState[state.ordinal()]);
+ }
+ gen.writeEndObject();
+
+ gen.writeFieldName("plan");
+ gen.writeRawValue(graph.getJsonPlan());
+
gen.writeEndObject();
gen.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
new file mode 100644
index 0000000..8554a31
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+
+public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+
+ public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ String vidString = params.get("vertexid");
+ if (vidString == null) {
+ throw new IllegalArgumentException("vertexId parameter missing");
+ }
+
+ JobVertexID vid;
+ try {
+ vid = JobVertexID.fromHexString(vidString);
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
+ }
+
+ ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
+ if (jobVertex == null) {
+ throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
+ }
+
+ StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
+
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+ gen.writeStartObject();
+ gen.writeStringField("id", jobVertex.getJobVertexId().toString());
+
+ gen.writeArrayFieldStart("user-accumulators");
+ for (StringifiedAccumulatorResult acc : accs) {
+ gen.writeStartObject();
+ gen.writeStringField("name", acc.getName());
+ gen.writeStringField("type", acc.getType());
+ gen.writeStringField("value", acc.getValue());
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
new file mode 100644
index 0000000..7815f49
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+
+public class JobVertexDetailsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+
+ public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ String vidString = params.get("vertexid");
+ if (vidString == null) {
+ throw new IllegalArgumentException("vertexId parameter missing");
+ }
+
+ JobVertexID vid;
+ try {
+ vid = JobVertexID.fromHexString(vidString);
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
+ }
+
+ ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
+ if (jobVertex == null) {
+ throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
+ }
+
+ final long now = System.currentTimeMillis();
+
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+ gen.writeStartObject();
+
+ gen.writeStringField("id", jobVertex.getJobVertexId().toString());
+ gen.writeStringField("name", jobVertex.getJobVertex().getName());
+ gen.writeNumberField("parallelism", jobVertex.getParallelism());
+ gen.writeNumberField("now", now);
+
+ gen.writeArrayFieldStart("subtasks");
+ int num = 0;
+ for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ final ExecutionState status = vertex.getExecutionState();
+
+ InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+ String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+ long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+ if (startTime == 0) {
+ startTime = -1;
+ }
+ long endTime = status.isTerminal() ? vertex.getStateTimestamp(status) : -1;
+ long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
+
+ Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
+ LongCounter readBytes;
+ LongCounter writeBytes;
+ LongCounter readRecords;
+ LongCounter writeRecords;
+
+ if (metrics != null) {
+ readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
+ writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
+ readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
+ writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
+ }
+ else {
+ readBytes = null;
+ writeBytes = null;
+ readRecords = null;
+ writeRecords = null;
+ }
+
+ gen.writeStartObject();
+ gen.writeNumberField("subtask", num);
+ gen.writeStringField("status", status.name());
+ gen.writeNumberField("attempt", vertex.getCurrentExecutionAttempt().getAttemptNumber());
+ gen.writeStringField("host", locationString);
+ gen.writeNumberField("start-time", startTime);
+ gen.writeNumberField("end-time", endTime);
+ gen.writeNumberField("duration", duration);
+
+ gen.writeObjectFieldStart("metrics");
+ gen.writeNumberField("read-bytes", readBytes != null ? readBytes.getLocalValuePrimitive() : -1L);
+ gen.writeNumberField("write-bytes", writeBytes != null ? writeBytes.getLocalValuePrimitive() : -1L);
+ gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L);
+ gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L);
+ gen.writeEndObject();
+
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 4d268ef..8cd7b44 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -68,7 +68,8 @@ public class SubtasksTimesHandler extends AbstractExecutionGraphRequestHandler i
gen.writeStringField("id", jobVertex.getJobVertexId().toString());
gen.writeStringField("name", jobVertex.getJobVertex().getName());
-
+ gen.writeNumberField("now", System.currentTimeMillis());
+
gen.writeArrayFieldStart("subtasks");
int num = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
index 3b25fe0..ea01acd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
@@ -18,6 +18,10 @@
package org.apache.flink.runtime.accumulators;
+import org.apache.flink.api.common.accumulators.Accumulator;
+
+import java.util.Map;
+
/**
* Container class that transports the result of an accumulator as set of strings.
*/
@@ -46,4 +50,26 @@ public class StringifiedAccumulatorResult implements java.io.Serializable{
public String getValue() {
return value;
}
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ public static StringifiedAccumulatorResult[] stringifyAccumulatorResults(Map<String, Accumulator<?, ?>> accs) {
+ StringifiedAccumulatorResult[] results = new StringifiedAccumulatorResult[accs.size()];
+
+ int i = 0;
+ for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
+ StringifiedAccumulatorResult result;
+ Accumulator<?, ?> value = entry.getValue();
+ if (value != null) {
+ result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString());
+ } else {
+ result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
+ }
+
+ results[i++] = result;
+ }
+ return results;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 47175a9..70a49fd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -584,27 +584,8 @@ public class ExecutionGraph implements Serializable {
* @return an Array containing the StringifiedAccumulatorResult objects
*/
public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
-
Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
-
- int num = accumulatorMap.size();
- StringifiedAccumulatorResult[] resultStrings = new StringifiedAccumulatorResult[num];
-
- int i = 0;
- for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
-
- StringifiedAccumulatorResult result;
- Accumulator<?, ?> value = entry.getValue();
- if (value != null) {
- result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString());
- } else {
- result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
- }
-
- resultStrings[i++] = result;
- }
-
- return resultStrings;
+ return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 2d6af6f..53d94d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.io.StrictlyLocalAssignment;
import org.apache.flink.core.io.InputSplit;
@@ -27,6 +28,7 @@ import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -218,33 +220,12 @@ public class ExecutionJobVertex implements Serializable {
}
public ExecutionState getAggregateState() {
-
int[] num = new int[ExecutionState.values().length];
-
for (ExecutionVertex vertex : this.taskVertices) {
num[vertex.getExecutionState().ordinal()]++;
}
-
- if (num[ExecutionState.FAILED.ordinal()] > 0) {
- return ExecutionState.FAILED;
- }
- if (num[ExecutionState.CANCELING.ordinal()] > 0) {
- return ExecutionState.CANCELING;
- }
- else if (num[ExecutionState.CANCELED.ordinal()] > 0) {
- return ExecutionState.CANCELED;
- }
- else if (num[ExecutionState.RUNNING.ordinal()] > 0) {
- return ExecutionState.RUNNING;
- }
- else if (num[ExecutionState.FINISHED.ordinal()] > 0) {
- return num[ExecutionState.FINISHED.ordinal()] == parallelism ?
- ExecutionState.FINISHED : ExecutionState.RUNNING;
- }
- else {
- // all else collapses under created
- return ExecutionState.CREATED;
- }
+
+ return getAggregateJobVertexState(num, parallelism);
}
//---------------------------------------------------------------------------------------------
@@ -528,6 +509,19 @@ public class ExecutionJobVertex implements Serializable {
return agg;
}
+ public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
+ Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>();
+
+ for (ExecutionVertex vertex : taskVertices) {
+ Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
+ if (next != null) {
+ AccumulatorHelper.mergeInto(userAccumulators, next);
+ }
+ }
+
+ return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
+ }
+
// --------------------------------------------------------------------------------------------
// Static / pre-assigned input splits
// --------------------------------------------------------------------------------------------
@@ -665,4 +659,31 @@ public class ExecutionJobVertex implements Serializable {
}
}
}
+
+ public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
+ if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {
+ throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
+ }
+
+ if (verticesPerState[ExecutionState.FAILED.ordinal()] > 0) {
+ return ExecutionState.FAILED;
+ }
+ if (verticesPerState[ExecutionState.CANCELING.ordinal()] > 0) {
+ return ExecutionState.CANCELING;
+ }
+ else if (verticesPerState[ExecutionState.CANCELED.ordinal()] > 0) {
+ return ExecutionState.CANCELED;
+ }
+ else if (verticesPerState[ExecutionState.RUNNING.ordinal()] > 0) {
+ return ExecutionState.RUNNING;
+ }
+ else if (verticesPerState[ExecutionState.FINISHED.ordinal()] > 0) {
+ return verticesPerState[ExecutionState.FINISHED.ordinal()] == parallelism ?
+ ExecutionState.FINISHED : ExecutionState.RUNNING;
+ }
+ else {
+ // all else collapses under created
+ return ExecutionState.CREATED;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
index 0844dab..bd5357e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
@@ -41,7 +41,6 @@ public class JsonPlanGenerator {
final JsonFactory factory = new JsonFactory();
final JsonGenerator gen = factory.createGenerator(writer);
- gen.useDefaultPrettyPrinter();
// start of everything
gen.writeStartObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 55a7ccb..d3fc8b1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -157,7 +157,7 @@ class MemoryArchivist(private val max_entries: Int)
message match {
case _ : RequestJobsOverview =>
try {
- sender ! createJobsOverview()
+ sender ! decorateMessage(createJobsOverview())
}
catch {
case t: Throwable => log.error("Exception while creating the jobs overview", t)
@@ -165,7 +165,7 @@ class MemoryArchivist(private val max_entries: Int)
case _ : RequestJobsWithIDsOverview =>
try {
- sender ! createJobsWithIDsOverview()
+ sender ! decorateMessage(createJobsWithIDsOverview())
}
catch {
case t: Throwable => log.error("Exception while creating the jobs overview", t)
@@ -176,7 +176,7 @@ class MemoryArchivist(private val max_entries: Int)
v => WebMonitorUtils.createDetailsForJob(v)
}.toArray[JobDetails]
- theSender ! new MultipleJobsDetails(null, details)
+ theSender ! decorateMessage(new MultipleJobsDetails(null, details))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index 1ed46f2..46fb694 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -71,8 +71,6 @@ public class JsonGeneratorTest {
String plan = JsonPlanGenerator.generatePlan(jg);
assertNotNull(plan);
-
- System.out.println(plan);
// validate the produced JSON
ObjectMapper m = new ObjectMapper();
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index b70853e..8d281f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
@@ -47,9 +48,9 @@ public class WebMonitorMessagesTest {
GenericMessageTester.testMessageInstance(RequestJobsOverview.getInstance());
GenericMessageTester.testMessageInstance(RequestJobsWithIDsOverview.getInstance());
GenericMessageTester.testMessageInstance(RequestStatusOverview.getInstance());
+ GenericMessageTester.testMessageInstance(RequestJobsOverview.getInstance());
- GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(RequestJobsOverview.class, rnd));
-
+ GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(RequestJobDetails.class, rnd));
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(StatusOverview.class, rnd));
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(JobsOverview.class, rnd));
http://git-wip-us.apache.org/repos/asf/flink/blob/99a351e4/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index c552700..121bc88 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -58,19 +58,19 @@ public class JsonJobGraphGenerationTest {
private PrintStream out;
private PrintStream err;
-// @Before
-// public void redirectStreams() {
-// this.out = System.out;
-// this.err = System.err;
-//
-// OutputStream discards = new OutputStream() {
-// @Override
-// public void write(int b) {}
-// };
-//
-// System.setOut(new PrintStream(discards));
-// System.setErr(new PrintStream(discards));
-// }
+ @Before
+ public void redirectStreams() {
+ this.out = System.out;
+ this.err = System.err;
+
+ OutputStream discards = new OutputStream() {
+ @Override
+ public void write(int b) {}
+ };
+
+ System.setOut(new PrintStream(discards));
+ System.setErr(new PrintStream(discards));
+ }
@After
public void restoreStreams() {
@@ -228,8 +228,6 @@ public class JsonJobGraphGenerationTest {
@Override
public void validtateJson(String json) throws Exception {
- System.out.println(json);
-
final Map<String, JsonNode> idToNode = new HashMap<>();
// validate the produced JSON