You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:20:00 UTC

[28/51] [abbrv] flink git commit: [FLINK-2554] [web dashboard] Add request hander that lists exceptions encountered during execution

[FLINK-2554] [web dashboard] Add request hander that lists exceptions encountered during execution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1984a3fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1984a3fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1984a3fb

Branch: refs/heads/master
Commit: 1984a3fbc22498b6e88b3c3144e59c2af098f1e2
Parents: b29a5d4
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 20 17:06:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 17 14:21:51 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/webmonitor/JsonFactory.java   |  40 ++++++-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |   4 +-
 .../handlers/JobExceptionsHandler.java          | 107 +++++++++++++++++++
 .../runtime/webmonitor/WebMonitorUtils.java     |   2 +
 4 files changed, 149 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1984a3fb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
index 40d095f..66d7b0c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 
+import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
@@ -180,9 +182,6 @@ public class JsonFactory {
 		gen.writeEndObject();
 	}
 	
-	
-	
-	
 	public static String generateJobsOverviewJSON(JobsWithIDsOverview overview) {
 		try {
 			List<JobID> runningIDs = overview.getJobsRunningOrPending();
@@ -222,6 +221,41 @@ public class JsonFactory {
 		}
 	}
 	
+	public static String generateExceptionsJson(Throwable rootCause, 
+												List<JobExceptionsHandler.ExceptionWithContext> allExceptions,
+												boolean truncated) {
+		try {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = jacksonFactory.createJsonGenerator(writer);
+
+			gen.writeStartObject();
+			gen.writeStringField("root-exception", ExceptionUtils.stringifyException(rootCause));
+			
+			if (allExceptions != null && !allExceptions.isEmpty()) {
+				gen.writeArrayFieldStart("all-exceptions");
+				for (JobExceptionsHandler.ExceptionWithContext ewc : allExceptions) {
+					gen.writeStartObject();
+					gen.writeStringField("exception", ExceptionUtils.stringifyException(ewc.getException()));
+					gen.writeStringField("task", ewc.getTaskName());
+					gen.writeStringField("location", ewc.getLocation());
+					gen.writeEndObject();
+				}
+				gen.writeEndArray();
+
+				gen.writeBooleanField("truncated", truncated);
+			}
+			
+			gen.writeEndObject();
+
+			gen.close();
+			return writer.toString();
+		}
+		catch (Exception e) {
+			// this should not happen
+			throw new RuntimeException(e.getMessage(), e);
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	/** Don't instantiate */

http://git-wip-us.apache.org/repos/asf/flink/blob/1984a3fb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 0799328..51df17c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
 import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobSummaryHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVerticesOverviewHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobsOverviewHandler;
@@ -138,11 +139,12 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid/vertices", handler(new JobVerticesOverviewHandler(currentGraphs)))
 			.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))
 			.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
+			.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
 
 //			.GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))
 
 			// the handler for the legacy requests
-			.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
+				.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
 					
 			// this handler serves all the static contents
 			.GET("/:*", new StaticFileServerHandler(webRootDir));

http://git-wip-us.apache.org/repos/asf/flink/blob/1984a3fb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
new file mode 100644
index 0000000..0f8cba7
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.JsonFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Request handler that returns the configuration of a job.
+ */
+public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+
+	private static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
+	
+	public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+		// most important is the root failure cause
+		Throwable rootException = graph.getFailureCause();
+
+		// we additionally collect all exceptions (up to a limit) that occurred in the individual tasks
+		List<ExceptionWithContext> localExceptions = new ArrayList<>();
+		boolean truncated = false;
+		
+		for (ExecutionVertex task : graph.getAllExecutionVertices()) {
+			Throwable t = task.getFailureCause();
+			if (t != null) {
+				if (localExceptions.size() >= MAX_NUMBER_EXCEPTION_TO_REPORT) {
+					truncated = true;
+					break;
+				}
+
+				InstanceConnectionInfo location = task.getCurrentAssignedResourceLocation();
+				String locationString = location != null ?
+						location.getFQDNHostname() + ':' + location.dataPort() :
+						"(unassigned)";
+				
+				localExceptions.add(new ExceptionWithContext(t, task.getTaskName(), locationString));
+			}
+		}
+		
+		// if only one exception occurred in a task, and that is the root exception,
+		// there is no need to display it twice
+		if (localExceptions.size() == 1 && localExceptions.get(0).getException() == rootException) {
+			localExceptions = null;
+		}
+		
+		return JsonFactory.generateExceptionsJson(rootException, localExceptions, truncated);
+	}
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Class that encapsulated an exception, together with the name of the throwing task, and
+	 * the instance on which the exception occurred.
+	 */
+	public static class ExceptionWithContext {
+		
+		private final Throwable exception;
+		private final String taskName;
+		private final String location;
+
+		public ExceptionWithContext(Throwable exception, String taskName, String location) {
+			this.exception = exception;
+			this.taskName = taskName;
+			this.location = location;
+		}
+
+		public Throwable getException() {
+			return exception;
+		}
+
+		public String getTaskName() {
+			return taskName;
+		}
+
+		public String getLocation() {
+			return location;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1984a3fb/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index aa784e6..2e50ca4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -53,6 +53,8 @@ public class WebMonitorUtils {
 			}
 		}
 		
+		lastChanged = Math.max(lastChanged, finished);
+		
 		return new JobDetails(job.getJobID(), job.getJobName(),
 				started, finished, status, lastChanged,  
 				countsPerStatus, numTotalTasks);