You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:20:20 UTC
[48/51] [abbrv] flink git commit: [FLINK-2687] [monitoring api] Add
handlers for subtask details and accumulators
[FLINK-2687] [monitoring api] Add handlers for subtask details and accumulators
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3f79172
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3f79172
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3f79172
Branch: refs/heads/master
Commit: b3f791727c0713251a4b02a766eb177eb9327113
Parents: 616b7d5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 10 15:47:28 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 17 14:21:52 2015 +0200
----------------------------------------------------------------------
flink-runtime-web/pom.xml | 18 -
.../runtime/webmonitor/WebRuntimeMonitor.java | 26 +-
.../AbstractExecutionGraphRequestHandler.java | 4 +-
.../AbstractJobVertexRequestHandler.java | 62 ++
.../AbstractSubtaskAttemptRequestHandler.java | 68 ++
.../handlers/AbstractSubtaskRequestHandler.java | 62 ++
.../handlers/JobVertexAccumulatorsHandler.java | 24 +-
.../handlers/JobVertexDetailsHandler.java | 32 +-
.../webmonitor/handlers/JsonFactory.java | 7 +-
.../SubtaskCurrentAttemptDetailsHandler.java | 39 ++
...taskExecutionAttemptAccumulatorsHandler.java | 68 ++
.../SubtaskExecutionAttemptDetailsHandler.java | 101 +++
.../SubtasksAllAccumulatorsHandler.java | 83 +++
.../handlers/SubtasksTimesHandler.java | 47 +-
.../legacy/JobManagerInfoHandler.java | 702 -------------------
.../runtime/webmonitor/legacy/JsonFactory.java | 112 ---
.../runtime/webmonitor/runner/TestRunner.java | 199 ------
.../StringifiedAccumulatorResult.java | 31 +-
.../flink/runtime/executiongraph/Execution.java | 14 +
.../runtime/executiongraph/ExecutionVertex.java | 9 +
20 files changed, 579 insertions(+), 1129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 0a05111..804e6da 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -90,24 +90,6 @@ under the License.
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
-
- <!-- ===================================================
- Testing
- =================================================== -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java-examples</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 8aa25dd..4633dcf 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -47,8 +47,11 @@ import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
+import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
-import org.apache.flink.runtime.webmonitor.legacy.JobManagerInfoHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,14 +64,16 @@ import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
/**
- * The root component of the web runtime monitor.
- *
- * <p>The web runtime monitor is based in Netty HTTP. It uses the Netty-Router library to route
+ * The root component of the web runtime monitor. This class starts the web server and creates
+ * all request handlers for the REST API.
+ * <p>
+ * The web runtime monitor is based in Netty HTTP. It uses the Netty-Router library to route
* HTTP requests of different paths to different response handlers. In addition, it serves the static
- * files of the web frontend, such as HTML, CSS, or JS files.</p>
+ * files of the web frontend, such as HTML, CSS, or JS files.
*/
public class WebRuntimeMonitor implements WebMonitor {
+ /** By default, all requests to the JobManager have a timeout of 10 seconds */
public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
/** Logger for web frontend startup / shutdown messages */
@@ -146,18 +151,17 @@ public class WebRuntimeMonitor implements WebMonitor {
.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
-// .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/:attempt", handler(null))
-// .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/:attempt/accumulators", handler(null))
+ .GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))
+ .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs)))
+ .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs)))
+ .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)))
.GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs)))
.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
- // the handler for the legacy requests
- .GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
-
- // this handler serves all the static contents
+ // this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(webRootDir));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index 01f48e4..d9b4e59 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -40,7 +40,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
@Override
- public String handleRequest(Map<String, String> params) throws Exception {
+ public final String handleRequest(Map<String, String> params) throws Exception {
String jidString = params.get("jobid");
if (jidString == null) {
throw new RuntimeException("JobId parameter missing");
@@ -56,7 +56,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
ExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid);
if (eg == null) {
- throw new NotFoundException("Could not find execution graph for job " + jid);
+ throw new NotFoundException("Could not find job with id " + jid);
}
return handleRequest(eg, params);
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
new file mode 100644
index 0000000..5b12907
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.util.Map;
+
+/**
+ * Base class for request handlers whose response depends on a specific job vertex (defined
+ * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter).
+ */
+public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler {
+
+ public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public final String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ final String vidString = params.get("vertexid");
+ if (vidString == null) {
+ throw new IllegalArgumentException("vertexId parameter missing");
+ }
+
+ final JobVertexID vid;
+ try {
+ vid = JobVertexID.fromHexString(vidString);
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
+ }
+
+ final ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
+ if (jobVertex == null) {
+ throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
+ }
+
+ return handleRequest(jobVertex, params);
+ }
+
+ public abstract String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
new file mode 100644
index 0000000..672df16
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.util.Map;
+
+/**
+ * Base class for request handlers whose response depends on a specific subtask execution attempt
+ * (defined via the "attempt" parameter) of a specific subtask (defined via the
+ * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a
+ * specific job, defined via (defined voa the "jobid" parameter).
+ */
+public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler {
+
+ public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception {
+ final String attemptNumberString = params.get("attempt");
+ if (attemptNumberString == null) {
+ throw new RuntimeException("Attempt number parameter missing");
+ }
+
+ final int attempt;
+ try {
+ attempt = Integer.parseInt(attemptNumberString);
+ }
+ catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid attempt number parameter");
+ }
+
+ final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
+ if (attempt == currentAttempt.getAttemptNumber()) {
+ return handleRequest(currentAttempt, params);
+ }
+ else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
+ Execution exec = vertex.getPriorExecutionAttempt(attempt);
+ return handleRequest(exec, params);
+ }
+ else {
+ throw new RuntimeException("Attempt does not exist: " + attempt);
+ }
+ }
+
+ public abstract String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
new file mode 100644
index 0000000..90866c6
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.util.Map;
+
+/**
+ * Base class for request handlers whose response depends on a specific subtask (defined via the
+ * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a
+ * specific job, defined via (defined voa the "jobid" parameter).
+ */
+public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler {
+
+ public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public final String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ final String subtaskNumberString = params.get("subtasknum");
+ if (subtaskNumberString == null) {
+ throw new RuntimeException("Subtask number parameter missing");
+ }
+
+ final int subtask;
+ try {
+ subtask = Integer.parseInt(subtaskNumberString);
+ }
+ catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid subtask number parameter");
+ }
+
+ if (subtask < 0 || subtask >= jobVertex.getParallelism()) {
+ throw new RuntimeException("subtask does not exist: " + subtask);
+ }
+
+ final ExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
+ return handleRequest(vertex, params);
+ }
+
+ public abstract String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 8554a31..ed2c541 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -21,41 +21,21 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
import java.util.Map;
-public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandler implements RequestHandler.JsonResponse {
public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}
@Override
- public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
- String vidString = params.get("vertexid");
- if (vidString == null) {
- throw new IllegalArgumentException("vertexId parameter missing");
- }
-
- JobVertexID vid;
- try {
- vid = JobVertexID.fromHexString(vidString);
- }
- catch (Exception e) {
- throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
- }
-
- ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
- if (jobVertex == null) {
- throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
- }
-
+ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
StringWriter writer = new StringWriter();
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 7815f49..21e2868 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -19,47 +19,31 @@
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
+
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
import java.util.Map;
-
-public class JobVertexDetailsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+/**
+ * A request handler that provides the details of a job vertex, including id, name, parallelism,
+ * and the runtime and metrics of all its subtasks.
+ */
+public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler implements RequestHandler.JsonResponse {
public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}
@Override
- public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
- String vidString = params.get("vertexid");
- if (vidString == null) {
- throw new IllegalArgumentException("vertexId parameter missing");
- }
-
- JobVertexID vid;
- try {
- vid = JobVertexID.fromHexString(vidString);
- }
- catch (Exception e) {
- throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
- }
-
- ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
- if (jobVertex == null) {
- throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
- }
-
+ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
final long now = System.currentTimeMillis();
StringWriter writer = new StringWriter();
@@ -123,6 +107,8 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphRequestHandle
gen.writeEndObject();
gen.writeEndObject();
+
+ num++;
}
gen.writeEndArray();
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
index cb83361..e886532 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
@@ -18,9 +18,14 @@
package org.apache.flink.runtime.webmonitor.handlers;
-
+/**
+ * A holder for the singleton Jackson JSON factory. Since the Jackson's JSON factory object
+ * is a heavyweight object that is encouraged to be shared, we use a singleton instance across
+ * all request handlers.
+ */
public class JsonFactory {
+ /** The singleton Jackson JSON factory. */
public static final com.fasterxml.jackson.core.JsonFactory jacksonFactory =
new com.fasterxml.jackson.core.JsonFactory();
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
new file mode 100644
index 0000000..d301bd1
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.util.Map;
+
+/**
+ * Request handler providing details about a single task execution attempt.
+ */
+public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler {
+
+ public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception {
+ return handleRequest(vertex.getCurrentExecutionAttempt(), params);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
new file mode 100644
index 0000000..f177b47
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Base class for request handlers whose response depends on a specific job vertex (defined
+ * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter).
+ */
+public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptRequestHandler implements RequestHandler.JsonResponse {
+
+ public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception {
+ final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
+
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+ gen.writeStartObject();
+
+ gen.writeNumberField("subtask", execAttempt.getVertex().getSubTaskIndex());
+ gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
+ gen.writeStringField("id", execAttempt.getAttemptId().toShortString());
+
+ gen.writeArrayFieldStart("user-accumulators");
+ for (StringifiedAccumulatorResult acc : accs) {
+ gen.writeStartObject();
+ gen.writeStringField("name", acc.getName());
+ gen.writeStringField("type", acc.getType());
+ gen.writeStringField("value", acc.getValue());
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
new file mode 100644
index 0000000..41aa929
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Request handler providing details about a single task execution attempt.
+ */
+public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptRequestHandler implements RequestHandler.JsonResponse {
+
+ public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception {
+ final ExecutionState status = execAttempt.getState();
+ final long now = System.currentTimeMillis();
+
+ InstanceConnectionInfo location = execAttempt.getAssignedResourceLocation();
+ String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+ long startTime = execAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
+ if (startTime == 0) {
+ startTime = -1;
+ }
+ long endTime = status.isTerminal() ? execAttempt.getStateTimestamp(status) : -1;
+ long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
+
+ Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = execAttempt.getFlinkAccumulators();
+ LongCounter readBytes;
+ LongCounter writeBytes;
+ LongCounter readRecords;
+ LongCounter writeRecords;
+
+ if (metrics != null) {
+ readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
+ writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
+ readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
+ writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
+ }
+ else {
+ readBytes = null;
+ writeBytes = null;
+ readRecords = null;
+ writeRecords = null;
+ }
+
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+ gen.writeStartObject();
+ gen.writeNumberField("subtask", execAttempt.getVertex().getSubTaskIndex());
+ gen.writeStringField("status", status.name());
+ gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
+ gen.writeStringField("host", locationString);
+ gen.writeNumberField("start-time", startTime);
+ gen.writeNumberField("end-time", endTime);
+ gen.writeNumberField("duration", duration);
+
+ gen.writeObjectFieldStart("metrics");
+ gen.writeNumberField("read-bytes", readBytes != null ? readBytes.getLocalValuePrimitive() : -1L);
+ gen.writeNumberField("write-bytes", writeBytes != null ? writeBytes.getLocalValuePrimitive() : -1L);
+ gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L);
+ gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L);
+ gen.writeEndObject();
+
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
new file mode 100644
index 0000000..4f97292
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Request handler that returns the accumulators for all subtasks of job vertex.
+ */
+public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHandler implements RequestHandler.JsonResponse {
+
+ public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+ gen.writeStartObject();
+ gen.writeStringField("id", jobVertex.getJobVertexId().toString());
+ gen.writeNumberField("parallelism", jobVertex.getParallelism());
+
+ gen.writeArrayFieldStart("subtasks");
+
+ int num = 0;
+ for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+
+ InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+ String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+ gen.writeStartObject();
+
+ gen.writeNumberField("subtask", num++);
+ gen.writeNumberField("attempt", vertex.getCurrentExecutionAttempt().getAttemptNumber());
+ gen.writeStringField("host", locationString);
+
+ StringifiedAccumulatorResult[] accs = vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
+ gen.writeArrayFieldStart("user-accumulators");
+ for (StringifiedAccumulatorResult acc : accs) {
+ gen.writeStartObject();
+ gen.writeStringField("name", acc.getName());
+ gen.writeStringField("type", acc.getType());
+ gen.writeStringField("value", acc.getValue());
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.writeEndObject();
+ gen.close();
+ return writer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 8cd7b44..b6fa136 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -19,21 +19,21 @@
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
+
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
import java.util.Map;
/**
- * Request handler that returns the JSON program plan of a job graph.
+ * Request handler that returns the state transition timestamps for all subtasks, plus their
+ * location and duration.
*/
-public class SubtasksTimesHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler implements RequestHandler.JsonResponse {
public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
@@ -41,25 +41,8 @@ public class SubtasksTimesHandler extends AbstractExecutionGraphRequestHandler i
}
@Override
- public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
- String vidString = params.get("vertexid");
- if (vidString == null) {
- throw new IllegalArgumentException("vertexId parameter missing");
- }
-
- JobVertexID vid;
- try {
- vid = JobVertexID.fromHexString(vidString);
- }
- catch (Exception e) {
- throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
- }
-
- ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
- if (jobVertex == null) {
- throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
- }
-
+ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ final long now = System.currentTimeMillis();
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
@@ -68,25 +51,37 @@ public class SubtasksTimesHandler extends AbstractExecutionGraphRequestHandler i
gen.writeStringField("id", jobVertex.getJobVertexId().toString());
gen.writeStringField("name", jobVertex.getJobVertex().getName());
- gen.writeNumberField("now", System.currentTimeMillis());
+ gen.writeNumberField("now", now);
gen.writeArrayFieldStart("subtasks");
int num = 0;
for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+
+ long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
+ ExecutionState status = vertex.getExecutionState();
+
+ long scheduledTime = timestamps[ExecutionState.SCHEDULED.ordinal()];
+
+ long start = scheduledTime > 0 ? scheduledTime : -1;
+ long end = status.isTerminal() ? timestamps[status.ordinal()] : now;
+ long duration = start >= 0 ? end - start : -1L;
+
gen.writeStartObject();
- gen.writeNumberField("subtask", num);
+ gen.writeNumberField("subtask", num++);
InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();
gen.writeStringField("host", locationString);
+ gen.writeNumberField("duration", duration);
+
gen.writeObjectFieldStart("timestamps");
- long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
for (ExecutionState state : ExecutionState.values()) {
gen.writeNumberField(state.name(), timestamps[state.ordinal()]);
}
gen.writeEndObject();
+
gen.writeEndObject();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
deleted file mode 100644
index c65bc0f..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
+++ /dev/null
@@ -1,702 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.legacy;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpVersion;
-import io.netty.handler.codec.http.router.KeepAliveWrite;
-import io.netty.handler.codec.http.router.Routed;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.ArchiveMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultStringsFound;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsNotFound;
-import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResultsStringified;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Tuple3;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-@ChannelHandler.Sharable
-public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
-
- private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoHandler.class);
-
- private static final Charset ENCODING = Charset.forName("UTF-8");
-
- /** Underlying JobManager */
- private final ActorGateway jobmanager;
- private final ActorGateway archive;
- private final FiniteDuration timeout;
-
-
- public JobManagerInfoHandler(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) {
- this.jobmanager = jobmanager;
- this.archive = archive;
- this.timeout = timeout;
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
- DefaultFullHttpResponse response;
- try {
- String result = handleRequest(routed);
- byte[] bytes = result.getBytes(ENCODING);
-
- response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
-
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
- }
- catch (Exception e) {
- byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);
- response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
- HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
- }
-
- response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
- KeepAliveWrite.flush(ctx, routed.request(), response);
- }
-
-
- @SuppressWarnings("unchecked")
- private String handleRequest(Routed routed) throws Exception {
- if ("archive".equals(routed.queryParam("get"))) {
- Future<Object> response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
-
- Object result = Await.result(response, timeout);
-
- if(!(result instanceof ArchiveMessages.ArchivedJobs)) {
- throw new RuntimeException("RequestArchiveJobs requires a response of type " +
- "ArchivedJobs. Instead the response is of type " + result.getClass() +".");
- }
- else {
- final List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(
- ((ArchiveMessages.ArchivedJobs) result).asJavaCollection());
-
- return writeJsonForArchive(archivedJobs);
- }
- }
- else if ("jobcounts".equals(routed.queryParam("get"))) {
- Future<Object> response = archive.ask(ArchiveMessages.getRequestJobCounts(), timeout);
-
- Object result = Await.result(response, timeout);
-
- if (!(result instanceof Tuple3)) {
- throw new RuntimeException("RequestJobCounts requires a response of type " +
- "Tuple3. Instead the response is of type " + result.getClass() +
- ".");
- }
- else {
- return writeJsonForJobCounts((Tuple3<Integer, Integer, Integer>) result);
- }
- }
- else if ("job".equals(routed.queryParam("get"))) {
- String jobId = routed.queryParam("job");
-
- Future<Object> response = archive.ask(new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
- timeout);
-
- Object result = Await.result(response, timeout);
-
- if (!(result instanceof JobManagerMessages.JobResponse)){
- throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
- "Instead the response is of type " + result.getClass());
- }
- else {
- final JobManagerMessages.JobResponse jobResponse = (JobManagerMessages.JobResponse) result;
-
- if (jobResponse instanceof JobManagerMessages.JobFound){
- ExecutionGraph archivedJob = ((JobManagerMessages.JobFound)result).executionGraph();
- return writeJsonForArchivedJob(archivedJob);
- }
- else {
- throw new Exception("DoGet:job: Could not find job for job ID " + jobId);
- }
- }
- }
- else if ("groupvertex".equals(routed.queryParam("get"))) {
- String jobId = routed.queryParam("job");
- String groupVertexId = routed.queryParam("groupvertex");
-
- // No group vertex specified
- if (groupVertexId.equals("null")) {
- throw new Exception("Found null groupVertexId");
- }
-
- Future<Object> response = archive.ask(new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
- timeout);
-
- Object result = Await.result(response, timeout);
-
- if (!(result instanceof JobManagerMessages.JobResponse)){
- throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
- "Instead the response is of type " + result.getClass());
- }
- else {
- final JobManagerMessages.JobResponse jobResponse = (JobManagerMessages.JobResponse) result;
-
- if (jobResponse instanceof JobManagerMessages.JobFound) {
- ExecutionGraph archivedJob = ((JobManagerMessages.JobFound)jobResponse).executionGraph();
-
- return writeJsonForArchivedJobGroupvertex(archivedJob, JobVertexID.fromHexString(groupVertexId));
- }
- else {
- throw new Exception("DoGet:groupvertex: Could not find job for job ID " + jobId);
- }
- }
- }
- else if ("taskmanagers".equals(routed.queryParam("get"))) {
- Future<Object> response = jobmanager.ask(
- JobManagerMessages.getRequestNumberRegisteredTaskManager(),
- timeout);
-
- Object result = Await.result(response, timeout);
-
- if (!(result instanceof Integer)) {
- throw new RuntimeException("RequestNumberRegisteredTaskManager requires a " +
- "response of type Integer. Instead the response is of type " +
- result.getClass() + ".");
- }
- else {
- final int numberOfTaskManagers = (Integer)result;
-
- final Future<Object> responseRegisteredSlots = jobmanager.ask(
- JobManagerMessages.getRequestTotalNumberOfSlots(),
- timeout);
-
- final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
- timeout);
-
- if (!(resultRegisteredSlots instanceof Integer)) {
- throw new RuntimeException("RequestTotalNumberOfSlots requires a response of " +
- "type Integer. Instaed the response of type " +
- resultRegisteredSlots.getClass() + ".");
- }
- else {
- final int numberOfRegisteredSlots = (Integer) resultRegisteredSlots;
-
- return "{\"taskmanagers\": " + numberOfTaskManagers + ", " +
- "\"slots\": " + numberOfRegisteredSlots + "}";
- }
- }
- }
- else if ("cancel".equals(routed.queryParam("get"))) {
- String jobId = routed.queryParam("job");
-
- Future<Object> response = jobmanager.ask(new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)),
- timeout);
-
- Await.ready(response, timeout);
- return "{}";
- }
- else if ("updates".equals(routed.queryParam("get"))) {
- String jobId = routed.queryParam("job");
- return writeJsonUpdatesForJob(JobID.fromHexString(jobId));
- }
- else if ("version".equals(routed.queryParam("get"))) {
- return writeJsonForVersion();
- }
- else{
- Future<Object> response = jobmanager.ask(JobManagerMessages.getRequestRunningJobs(),
- timeout);
-
- Object result = Await.result(response, timeout);
-
- if(!(result instanceof JobManagerMessages.RunningJobs)){
- throw new RuntimeException("RequestRunningJobs requires a response of type " +
- "RunningJobs. Instead the response of type " + result.getClass() + ".");
- }
- else {
- final Iterable<ExecutionGraph> runningJobs =
- ((JobManagerMessages.RunningJobs) result).asJavaIterable();
-
- return writeJsonForJobs(runningJobs);
- }
- }
- }
-
- private String writeJsonForJobs(Iterable<ExecutionGraph> graphs) {
- StringBuilder bld = new StringBuilder();
- bld.append("[");
-
- Iterator<ExecutionGraph> it = graphs.iterator();
- // Loop Jobs
- while(it.hasNext()){
- ExecutionGraph graph = it.next();
-
- writeJsonForJob(bld, graph);
-
- //Write seperator between json objects
- if(it.hasNext()) {
- bld.append(",");
- }
- }
- bld.append("]");
-
- return bld.toString();
- }
-
- private void writeJsonForJob(StringBuilder bld, ExecutionGraph graph) {
- //Serialize job to json
- bld.append("{");
- bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
- bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
- bld.append("\"status\": \"").append(graph.getState()).append("\",");
- bld.append("\"time\": ").append(graph.getStatusTimestamp(graph.getState())).append(",");
-
- // Serialize ManagementGraph to json
- bld.append("\"groupvertices\": [");
- boolean first = true;
-
- for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
- //Write seperator between json objects
- if (first) {
- first = false;
- } else {
- bld.append(",");
- }
- bld.append(JsonFactory.toJson(groupVertex));
- }
- bld.append("]");
- bld.append("}");
- }
-
- private String writeJsonForArchive(List<ExecutionGraph> graphs) {
- StringBuilder bld = new StringBuilder();
- bld.append("[");
-
- // sort jobs by time
- Collections.sort(graphs, new Comparator<ExecutionGraph>() {
- @Override
- public int compare(ExecutionGraph o1, ExecutionGraph o2) {
- if (o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) {
- return 1;
- } else {
- return -1;
- }
- }
-
- });
-
- // Loop Jobs
- for (int i = 0; i < graphs.size(); i++) {
- ExecutionGraph graph = graphs.get(i);
-
- //Serialize job to json
- bld.append("{");
- bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
- bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
- bld.append("\"status\": \"").append(graph.getState()).append("\",");
- bld.append("\"time\": ").append(graph.getStatusTimestamp(graph.getState()));
-
- bld.append("}");
-
- //Write seperator between json objects
- if(i != graphs.size() - 1) {
- bld.append(",");
- }
- }
- bld.append("]");
- return bld.toString();
- }
-
- private String writeJsonForJobCounts(Tuple3<Integer, Integer, Integer> jobCounts) {
- return "{\"finished\": " + jobCounts._1() + ",\"canceled\": " + jobCounts._2() + ",\"failed\": "
- + jobCounts._3() + "}";
- }
-
-
- private String writeJsonForArchivedJob(ExecutionGraph graph) {
- StringBuilder bld = new StringBuilder();
-
- bld.append("[");
- bld.append("{");
- bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
- bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
- bld.append("\"status\": \"").append(graph.getState()).append("\",");
- bld.append("\"SCHEDULED\": ").append(graph.getStatusTimestamp(JobStatus.CREATED)).append(",");
- bld.append("\"RUNNING\": ").append(graph.getStatusTimestamp(JobStatus.RUNNING)).append(",");
- bld.append("\"FINISHED\": ").append(graph.getStatusTimestamp(JobStatus.FINISHED)).append(",");
- bld.append("\"FAILED\": ").append(graph.getStatusTimestamp(JobStatus.FAILED)).append(",");
- bld.append("\"CANCELED\": ").append(graph.getStatusTimestamp(JobStatus.CANCELED)).append(",");
-
- if (graph.getState() == JobStatus.FAILED) {
- bld.append("\"failednodes\": [");
- boolean first = true;
- for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
- if (vertex.getExecutionState() == ExecutionState.FAILED) {
- InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
- Throwable failureCause = vertex.getFailureCause();
- if (location != null || failureCause != null) {
- if (first) {
- first = false;
- } else {
- bld.append(",");
- }
- bld.append("{");
- bld.append("\"node\": \"").append(location == null ? "(none)" : location.getFQDNHostname()).append("\",");
- bld.append("\"message\": \"").append(failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))).append("\"");
- bld.append("}");
- }
- }
- }
- bld.append("],");
- }
-
- // Serialize ManagementGraph to json
- bld.append("\"groupvertices\": [");
- boolean first = true;
- for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
- //Write seperator between json objects
- if (first) {
- first = false;
- } else {
- bld.append(",");
- }
-
- bld.append(JsonFactory.toJson(groupVertex));
-
- }
- bld.append("],");
-
- // write user config
- ExecutionConfig ec = graph.getExecutionConfig();
- if(ec != null) {
- bld.append("\"executionConfig\": {");
- bld.append("\"Execution Mode\": \"").append(ec.getExecutionMode()).append("\",");
- bld.append("\"Number of execution retries\": \"").append(ec.getNumberOfExecutionRetries()).append("\",");
- bld.append("\"Job parallelism\": \"").append(ec.getParallelism()).append("\",");
- bld.append("\"Object reuse mode\": \"").append(ec.isObjectReuseEnabled()).append("\"");
- ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();
- if(uc != null) {
- Map<String, String> ucVals = uc.toMap();
- if (ucVals != null) {
- String ucString = "{";
- int i = 0;
- for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
- ucString += "\"" + ucVal.getKey() + "\":\"" + ucVal.getValue() + "\"";
- if (++i < ucVals.size()) {
- ucString += ",\n";
- }
- }
- bld.append(", \"userConfig\": ").append(ucString).append("}");
- }
- else {
- LOG.debug("GlobalJobParameters.toMap() did not return anything");
- }
- }
- else {
- LOG.debug("No GlobalJobParameters were set in the execution config");
- }
- bld.append("},");
- }
- else {
- LOG.warn("Unable to retrieve execution config from execution graph");
- }
-
- // write accumulators
- final Future<Object> response = jobmanager.ask(
- new RequestAccumulatorResultsStringified(graph.getJobID()),
- timeout);
-
- Object result;
- try {
- result = Await.result(response, timeout);
- }
- catch (Exception ex) {
- throw new RuntimeException("Could not retrieve the accumulator results from the job manager.", ex);
- }
-
- if (result instanceof AccumulatorResultStringsFound) {
- StringifiedAccumulatorResult[] accumulators = ((AccumulatorResultStringsFound) result).result();
-
- bld.append("\n\"accumulators\": [");
- int i = 0;
- for (StringifiedAccumulatorResult accumulator : accumulators) {
- bld.append("{ \"name\": \"").append(accumulator.getName()).append(" (").append(accumulator.getType()).append(")\",").append(" \"value\": \"").append(accumulator.getValue()).append("\"}\n");
- if (++i < accumulators.length) {
- bld.append(",");
- }
- }
- bld.append("],\n");
- }
- else if (result instanceof AccumulatorResultsNotFound) {
- bld.append("\n\"accumulators\": [],");
- }
- else if (result instanceof AccumulatorResultsErroneous) {
- LOG.error("Could not obtain accumulators for job " + graph.getJobID(),
- ((AccumulatorResultsErroneous) result).cause());
- }
- else {
- throw new RuntimeException("RequestAccumulatorResults requires a response of type " +
- "AccumulatorResultStringsFound. Instead the response is of type " +
- result.getClass() + ".");
- }
-
- bld.append("\"groupverticetimes\": {");
- first = true;
-
- for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
- if (first) {
- first = false;
- } else {
- bld.append(",");
- }
-
- // Calculate start and end time for groupvertex
- long started = Long.MAX_VALUE;
- long ended = 0;
-
- // Take earliest running state and latest endstate of groupmembers
- for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-
- long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
- if (running != 0 && running < started) {
- started = running;
- }
-
- long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
- long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
- long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
-
- if (finished != 0 && finished > ended) {
- ended = finished;
- }
-
- if (canceled != 0 && canceled > ended) {
- ended = canceled;
- }
-
- if (failed != 0 && failed > ended) {
- ended = failed;
- }
-
- }
-
- bld.append("\"").append(groupVertex.getJobVertexId()).append("\": {");
- bld.append("\"groupvertexid\": \"").append(groupVertex.getJobVertexId()).append("\",");
- bld.append("\"groupvertexname\": \"").append(groupVertex).append("\",");
- bld.append("\"STARTED\": ").append(started).append(",");
- bld.append("\"ENDED\": ").append(ended);
- bld.append("}");
-
- }
-
- bld.append("}");
- bld.append("}");
- bld.append("]");
-
- return bld.toString();
- }
-
-
- private String writeJsonUpdatesForJob(JobID jobId) {
- final Future<Object> responseArchivedJobs = jobmanager.ask(
- JobManagerMessages.getRequestRunningJobs(),
- timeout);
-
- Object resultArchivedJobs;
- try{
- resultArchivedJobs = Await.result(responseArchivedJobs, timeout);
- }
- catch (Exception ex) {
- throw new RuntimeException("Could not retrieve archived jobs from the job manager.", ex);
- }
-
- if(!(resultArchivedJobs instanceof JobManagerMessages.RunningJobs)){
- throw new RuntimeException("RequestArchivedJobs requires a response of type " +
- "RunningJobs. Instead the response is of type " +
- resultArchivedJobs.getClass() + ".");
- }
- else {
- final Iterable<ExecutionGraph> graphs = ((JobManagerMessages.RunningJobs)resultArchivedJobs).
- asJavaIterable();
-
- //Serialize job to json
- final StringBuilder bld = new StringBuilder();
-
- bld.append("{");
- bld.append("\"jobid\": \"").append(jobId).append("\",");
- bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\",");
- bld.append("\"recentjobs\": [");
-
- boolean first = true;
-
- for (ExecutionGraph g : graphs){
- if (first) {
- first = false;
- } else {
- bld.append(",");
- }
-
- bld.append("\"").append(g.getJobID()).append("\"");
- }
- bld.append("],");
-
- final Future<Object> responseJob = jobmanager.ask(
- new JobManagerMessages.RequestJob(jobId),
- timeout);
-
- Object resultJob;
- try{
- resultJob = Await.result(responseJob, timeout);
- }
- catch (Exception ex){
- throw new RuntimeException("Could not retrieve the job with jobID " + jobId +
- "from the job manager.", ex);
- }
-
- if (!(resultJob instanceof JobManagerMessages.JobResponse)) {
- throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
- "Instead the response is of type " + resultJob.getClass() + ".");
- }
- else {
- final JobManagerMessages.JobResponse response = (JobManagerMessages.JobResponse) resultJob;
-
- if (response instanceof JobManagerMessages.JobFound){
- ExecutionGraph graph = ((JobManagerMessages.JobFound)response).executionGraph();
-
- bld.append("\"vertexevents\": [");
-
- first = true;
- for (ExecutionVertex ev : graph.getAllExecutionVertices()) {
- if (first) {
- first = false;
- } else {
- bld.append(",");
- }
-
- bld.append("{");
- bld.append("\"vertexid\": \"").append(ev.getCurrentExecutionAttempt().getAttemptId()).append("\",");
- bld.append("\"newstate\": \"").append(ev.getExecutionState()).append("\",");
- bld.append("\"timestamp\": \"").append(ev.getStateTimestamp(ev.getExecutionState())).append("\"");
- bld.append("}");
- }
-
- bld.append("],");
-
- bld.append("\"jobevents\": [");
-
- bld.append("{");
- bld.append("\"newstate\": \"").append(graph.getState()).append("\",");
- bld.append("\"timestamp\": \"").append(graph.getStatusTimestamp(graph.getState())).append("\"");
- bld.append("}");
-
- bld.append("]");
-
- bld.append("}");
- }
- else {
- bld.append("\"vertexevents\": [],");
- bld.append("\"jobevents\": [");
- bld.append("{");
- bld.append("\"newstate\": \"").append(JobStatus.FINISHED.toString()).append("\",");
- bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\"");
- bld.append("}");
- bld.append("]");
- bld.append("}");
- }
- }
-
- return bld.toString();
- }
- }
-
- private String writeJsonForArchivedJobGroupvertex(ExecutionGraph graph, JobVertexID vertexId) {
- ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
- StringBuilder bld = new StringBuilder();
-
- bld.append("{\"groupvertex\": ").append(JsonFactory.toJson(jobVertex)).append(",");
-
- bld.append("\"verticetimes\": {");
- boolean first = true;
- for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
-
- for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-
- Execution exec = vertex.getCurrentExecutionAttempt();
-
- if(first) {
- first = false;
- } else {
- bld.append(","); }
-
- bld.append("\"").append(exec.getAttemptId()).append("\": {");
- bld.append("\"vertexid\": \"").append(exec.getAttemptId()).append("\",");
- bld.append("\"vertexname\": \"").append(vertex).append("\",");
- bld.append("\"CREATED\": ").append(vertex.getStateTimestamp(ExecutionState.CREATED)).append(",");
- bld.append("\"SCHEDULED\": ").append(vertex.getStateTimestamp(ExecutionState.SCHEDULED)).append(",");
- bld.append("\"DEPLOYING\": ").append(vertex.getStateTimestamp(ExecutionState.DEPLOYING)).append(",");
- bld.append("\"RUNNING\": ").append(vertex.getStateTimestamp(ExecutionState.RUNNING)).append(",");
- bld.append("\"FINISHED\": ").append(vertex.getStateTimestamp(ExecutionState.FINISHED)).append(",");
- bld.append("\"CANCELING\": ").append(vertex.getStateTimestamp(ExecutionState.CANCELING)).append(",");
- bld.append("\"CANCELED\": ").append(vertex.getStateTimestamp(ExecutionState.CANCELED)).append(",");
- bld.append("\"FAILED\": ").append(vertex.getStateTimestamp(ExecutionState.FAILED)).append("");
- bld.append("}");
- }
-
- }
- bld.append("}}");
- return bld.toString();
- }
-
-
- private String writeJsonForVersion() {
- return "{\"version\": \"" + EnvironmentInformation.getVersion() + "\",\"revision\": \"" +
- EnvironmentInformation.getRevisionInformation().commitId + "\"}";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
deleted file mode 100644
index fe18d3f..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.legacy;
-
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.util.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class JsonFactory {
-
- public static String toJson(ExecutionVertex vertex) {
- StringBuilder json = new StringBuilder("");
- json.append("{");
- json.append("\"vertexid\": \"").append(vertex.getCurrentExecutionAttempt().getAttemptId()).append("\",");
- json.append("\"vertexname\": \"").append(StringUtils.escapeHtml(vertex.getSimpleName())).append("\",");
- json.append("\"vertexstatus\": \"").append(vertex.getExecutionState()).append("\",");
-
- InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
- String instanceName = location == null ? "(null)" : location.getFQDNHostname();
-
- json.append("\"vertexinstancename\": \"").append(instanceName).append("\"");
- json.append("}");
- return json.toString();
- }
-
- public static String toJson(ExecutionJobVertex jobVertex) {
- StringBuilder json = new StringBuilder("");
-
- json.append("{");
- json.append("\"groupvertexid\": \"").append(jobVertex.getJobVertexId()).append("\",");
- json.append("\"groupvertexname\": \"").append(StringUtils.escapeHtml(jobVertex.getJobVertex().getName())).append("\",");
- json.append("\"numberofgroupmembers\": ").append(jobVertex.getParallelism()).append(",");
- json.append("\"groupmembers\": [");
-
- // Count state status of group members
- Map<ExecutionState, Integer> stateCounts = new HashMap<ExecutionState, Integer>();
-
- // initialize with 0
- for (ExecutionState state : ExecutionState.values()) {
- stateCounts.put(state, 0);
- }
-
- ExecutionVertex[] vertices = jobVertex.getTaskVertices();
-
- for (int j = 0; j < vertices.length; j++) {
- ExecutionVertex vertex = vertices[j];
-
- json.append(toJson(vertex));
-
- // print delimiter
- if (j != vertices.length - 1) {
- json.append(",");
- }
-
- // Increment state status count
- int count = stateCounts.get(vertex.getExecutionState()) + 1;
- stateCounts.put(vertex.getExecutionState(), count);
- }
-
- json.append("],");
- json.append("\"backwardEdges\": [");
-
- List<IntermediateResult> inputs = jobVertex.getInputs();
-
- for (int inputNumber = 0; inputNumber < inputs.size(); inputNumber++) {
- ExecutionJobVertex input = inputs.get(inputNumber).getProducer();
-
- json.append("{");
- json.append("\"groupvertexid\": \"").append(input.getJobVertexId()).append("\",");
- json.append("\"groupvertexname\": \"").append(StringUtils.escapeHtml(jobVertex.getJobVertex().getName())).append("\"");
- json.append("}");
-
- // print delimiter
- if(inputNumber != inputs.size() - 1) {
- json.append(",");
- }
- }
- json.append("]");
-
- // list number of members for each status
- for (Map.Entry<ExecutionState, Integer> stateCount : stateCounts.entrySet()) {
- json.append(",\"").append(stateCount.getKey()).append("\": ").append(stateCount.getValue());
- }
-
- json.append("}");
-
- return json.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
deleted file mode 100644
index a2978a1..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.runner;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.java.relational.util.WebLogData;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
-
-/**
- * Simple runner that brings up a local cluster with the web server and executes two
- * jobs to expose their data in the archive
- */
-@SuppressWarnings("serial")
-public class TestRunner {
-
- public static void main(String[] args) throws Exception {
-
- // start the cluster with the runtime monitor
- Configuration configuration = new Configuration();
- configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
- configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
- configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY,
- "flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
-
- LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false);
- cluster.start();
-
- final int port = cluster.getLeaderRPCPort();
- runWordCount(port);
- runWebLogAnalysisExample(port);
- runWordCount(port);
-
- // block the thread
- Object o = new Object();
- synchronized (o) {
- o.wait();
- }
-
- cluster.shutdown();
- }
-
- private static void runWordCount(int port) throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
-
- DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
-
- DataSet<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
- text.flatMap(new Tokenizer())
- // group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0)
- .sum(1);
-
- counts.print();
- }
-
- private static void runWebLogAnalysisExample(int port) throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
-
- // get input data
- DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);
- DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);
- DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);
-
- // Retain documents with keywords
- DataSet<Tuple1<String>> filterDocs = documents
- .filter(new FilterDocByKeyWords())
- .project(0);
-
- // Filter ranks by minimum rank
- DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
- .filter(new FilterByRank());
-
- // Filter visits by visit date
- DataSet<Tuple1<String>> filterVisits = visits
- .filter(new FilterVisitsByDate())
- .project(0);
-
- // Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
- DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
- filterDocs.join(filterRanks)
- .where(0).equalTo(1)
- .projectSecond(0,1,2);
-
- // Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
- DataSet<Tuple3<Integer, String, Integer>> result =
- joinDocsRanks.coGroup(filterVisits)
- .where(1).equalTo(0)
- .with(new AntiJoinVisits());
-
- result.print();
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-
- public static class FilterDocByKeyWords implements FilterFunction<Tuple2<String, String>> {
-
- private static final String[] KEYWORDS = { " editors ", " oscillations " };
-
- @Override
- public boolean filter(Tuple2<String, String> value) throws Exception {
- // FILTER
- // Only collect the document if all keywords are contained
- String docText = value.f1;
- for (String kw : KEYWORDS) {
- if (!docText.contains(kw)) {
- return false;
- }
- }
- return true;
- }
- }
-
- public static class FilterByRank implements FilterFunction<Tuple3<Integer, String, Integer>> {
-
- private static final int RANKFILTER = 40;
-
- @Override
- public boolean filter(Tuple3<Integer, String, Integer> value) throws Exception {
- return (value.f0 > RANKFILTER);
- }
- }
-
-
- public static class FilterVisitsByDate implements FilterFunction<Tuple2<String, String>> {
-
- private static final int YEARFILTER = 2007;
-
- @Override
- public boolean filter(Tuple2<String, String> value) throws Exception {
- // Parse date string with the format YYYY-MM-DD and extract the year
- String dateString = value.f1;
- int year = Integer.parseInt(dateString.substring(0,4));
- return (year == YEARFILTER);
- }
- }
-
-
- @FunctionAnnotation.ForwardedFieldsFirst("*")
- public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
-
- @Override
- public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
- // Check if there is a entry in the visits relation
- if (!visits.iterator().hasNext()) {
- for (Tuple3<Integer, String, Integer> next : ranks) {
- // Emit all rank pairs
- out.collect(next);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
index ea01acd..a0d1eda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
@@ -56,20 +56,25 @@ public class StringifiedAccumulatorResult implements java.io.Serializable{
// ------------------------------------------------------------------------
public static StringifiedAccumulatorResult[] stringifyAccumulatorResults(Map<String, Accumulator<?, ?>> accs) {
- StringifiedAccumulatorResult[] results = new StringifiedAccumulatorResult[accs.size()];
-
- int i = 0;
- for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
- StringifiedAccumulatorResult result;
- Accumulator<?, ?> value = entry.getValue();
- if (value != null) {
- result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString());
- } else {
- result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
+ if (accs == null || accs.isEmpty()) {
+ return new StringifiedAccumulatorResult[0];
+ }
+ else {
+ StringifiedAccumulatorResult[] results = new StringifiedAccumulatorResult[accs.size()];
+
+ int i = 0;
+ for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
+ StringifiedAccumulatorResult result;
+ Accumulator<?, ?> value = entry.getValue();
+ if (value != null) {
+ result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString());
+ } else {
+ result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
+ }
+
+ results[i++] = result;
}
-
- results[i++] = result;
+ return results;
}
- return results;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 4cee2f3..faabfb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,6 +23,7 @@ import akka.dispatch.OnFailure;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -959,6 +960,10 @@ public class Execution implements Serializable {
return vertex.getSimpleName() + " - execution #" + attemptNumber;
}
+ // ------------------------------------------------------------------------
+ // Accumulators
+ // ------------------------------------------------------------------------
+
/**
* Update accumulators (discarded when the Execution has already been terminated).
* @param flinkAccumulators the flink internal accumulators
@@ -973,14 +978,23 @@ public class Execution implements Serializable {
}
}
}
+
public Map<String, Accumulator<?, ?>> getUserAccumulators() {
return userAccumulators;
}
+ public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
+ return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
+ }
+
public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
return flinkAccumulators;
}
+ // ------------------------------------------------------------------------
+ // Standard utilities
+ // ------------------------------------------------------------------------
+
@Override
public String toString() {
return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
http://git-wip-us.apache.org/repos/asf/flink/blob/b3f79172/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 78e9804..0d039cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -229,6 +229,15 @@ public class ExecutionVertex implements Serializable {
return currentExecution.getAssignedResourceLocation();
}
+ public Execution getPriorExecutionAttempt(int attemptNumber) {
+ if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
+ return priorExecutions.get(attemptNumber);
+ }
+ else {
+ throw new IllegalArgumentException("attempt does not exist");
+ }
+ }
+
public ExecutionGraph getExecutionGraph() {
return this.jobVertex.getGraph();
}