You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:20:00 UTC
[28/51] [abbrv] flink git commit: [FLINK-2554] [web dashboard] Add
request hander that lists exceptions encountered during execution
[FLINK-2554] [web dashboard] Add request hander that lists exceptions encountered during execution
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1984a3fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1984a3fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1984a3fb
Branch: refs/heads/master
Commit: 1984a3fbc22498b6e88b3c3144e59c2af098f1e2
Parents: b29a5d4
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 20 17:06:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 17 14:21:51 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/webmonitor/JsonFactory.java | 40 ++++++-
.../runtime/webmonitor/WebRuntimeMonitor.java | 4 +-
.../handlers/JobExceptionsHandler.java | 107 +++++++++++++++++++
.../runtime/webmonitor/WebMonitorUtils.java | 2 +
4 files changed, 149 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1984a3fb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
index 40d095f..66d7b0c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
+import org.apache.flink.util.ExceptionUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -180,9 +182,6 @@ public class JsonFactory {
gen.writeEndObject();
}
-
-
-
public static String generateJobsOverviewJSON(JobsWithIDsOverview overview) {
try {
List<JobID> runningIDs = overview.getJobsRunningOrPending();
@@ -222,6 +221,41 @@ public class JsonFactory {
}
}
+ public static String generateExceptionsJson(Throwable rootCause,
+ List<JobExceptionsHandler.ExceptionWithContext> allExceptions,
+ boolean truncated) {
+ try {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = jacksonFactory.createJsonGenerator(writer);
+
+ gen.writeStartObject();
+ gen.writeStringField("root-exception", ExceptionUtils.stringifyException(rootCause));
+
+ if (allExceptions != null && !allExceptions.isEmpty()) {
+ gen.writeArrayFieldStart("all-exceptions");
+ for (JobExceptionsHandler.ExceptionWithContext ewc : allExceptions) {
+ gen.writeStartObject();
+ gen.writeStringField("exception", ExceptionUtils.stringifyException(ewc.getException()));
+ gen.writeStringField("task", ewc.getTaskName());
+ gen.writeStringField("location", ewc.getLocation());
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.writeBooleanField("truncated", truncated);
+ }
+
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
+ }
+ catch (Exception e) {
+ // this should not happen
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
// --------------------------------------------------------------------------------------------
/** Don't instantiate */
http://git-wip-us.apache.org/repos/asf/flink/blob/1984a3fb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 0799328..51df17c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobSummaryHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVerticesOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobsOverviewHandler;
@@ -138,11 +139,12 @@ public class WebRuntimeMonitor implements WebMonitor {
.GET("/jobs/:jobid/vertices", handler(new JobVerticesOverviewHandler(currentGraphs)))
.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))
.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
+ .GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
// .GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))
// the handler for the legacy requests
- .GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
+ .GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
// this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(webRootDir));
http://git-wip-us.apache.org/repos/asf/flink/blob/1984a3fb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
new file mode 100644
index 0000000..0f8cba7
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.JsonFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Request handler that returns the configuration of a job.
+ */
+public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+
+ private static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
+
+ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ // most important is the root failure cause
+ Throwable rootException = graph.getFailureCause();
+
+ // we additionally collect all exceptions (up to a limit) that occurred in the individual tasks
+ List<ExceptionWithContext> localExceptions = new ArrayList<>();
+ boolean truncated = false;
+
+ for (ExecutionVertex task : graph.getAllExecutionVertices()) {
+ Throwable t = task.getFailureCause();
+ if (t != null) {
+ if (localExceptions.size() >= MAX_NUMBER_EXCEPTION_TO_REPORT) {
+ truncated = true;
+ break;
+ }
+
+ InstanceConnectionInfo location = task.getCurrentAssignedResourceLocation();
+ String locationString = location != null ?
+ location.getFQDNHostname() + ':' + location.dataPort() :
+ "(unassigned)";
+
+ localExceptions.add(new ExceptionWithContext(t, task.getTaskName(), locationString));
+ }
+ }
+
+ // if only one exception occurred in a task, and that is the root exception,
+ // there is no need to display it twice
+ if (localExceptions.size() == 1 && localExceptions.get(0).getException() == rootException) {
+ localExceptions = null;
+ }
+
+ return JsonFactory.generateExceptionsJson(rootException, localExceptions, truncated);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Class that encapsulated an exception, together with the name of the throwing task, and
+ * the instance on which the exception occurred.
+ */
+ public static class ExceptionWithContext {
+
+ private final Throwable exception;
+ private final String taskName;
+ private final String location;
+
+ public ExceptionWithContext(Throwable exception, String taskName, String location) {
+ this.exception = exception;
+ this.taskName = taskName;
+ this.location = location;
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1984a3fb/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index aa784e6..2e50ca4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -53,6 +53,8 @@ public class WebMonitorUtils {
}
}
+ lastChanged = Math.max(lastChanged, finished);
+
return new JobDetails(job.getJobID(), job.getJobName(),
started, finished, status, lastChanged,
countsPerStatus, numTotalTasks);