You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/25 20:15:19 UTC

[6/6] flink git commit: [FLINK-7858][flip6] Return with HTTP 404 if job or jobvertex are unknown

[FLINK-7858][flip6] Return with HTTP 404 if job or jobvertex are unknown

Annotate AccessExecutionGraph#getJobVertex(JobVertexID) with @Nullable.
Throw NotFoundException in JobVertexTaskManagersHandler if jobvertexId is unknown.
Throw NotFoundException in AbstractExecutionGraphHandler if jobId is unknown.
Copy Javadoc from legacy JobVertexTaskManagersHandler.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37b4e2ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37b4e2ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37b4e2ce

Branch: refs/heads/master
Commit: 37b4e2cef687160f2bc7cedb7d2360825089569e
Parents: 056c72a
Author: gyao <ga...@data-artisans.com>
Authored: Wed Jan 24 12:24:35 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:55:54 2018 +0100

----------------------------------------------------------------------
 .../executiongraph/AccessExecutionGraph.java    |  3 +-
 .../flink/runtime/rest/NotFoundException.java   |  4 ++
 .../job/AbstractExecutionGraphHandler.java      | 15 ++++++-
 .../job/JobVertexTaskManagersHandler.java       | 41 +++++++++++++-------
 .../messages/JobVertexTaskManagersHeaders.java  |  2 +
 .../messages/JobVertexTaskManagersInfo.java     | 12 +++---
 6 files changed, 54 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 362afa1..8d1fa1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -76,8 +76,9 @@ public interface AccessExecutionGraph {
 	 * Returns the job vertex for the given {@link JobVertexID}.
 	 *
 	 * @param id id of job vertex to be returned
-	 * @return job vertex for the given id, or null
+	 * @return job vertex for the given id, or {@code null}
 	 */
+	@Nullable
 	AccessExecutionJobVertex getJobVertex(JobVertexID id);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
index 50060b0..f9db334 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
@@ -33,4 +33,8 @@ public class NotFoundException extends RestHandlerException {
 	public NotFoundException(String message) {
 		super(message, HttpResponseStatus.NOT_FOUND);
 	}
+
+	public NotFoundException(String message, Throwable cause) {
+		super(message, HttpResponseStatus.NOT_FOUND, cause);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index 7192832..7c42af1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -32,6 +34,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -79,8 +82,16 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M ex
 				} catch (RestHandlerException rhe) {
 					throw new CompletionException(rhe);
 				}
-			},
-			executor);
+			}, executor)
+			.exceptionally(throwable -> {
+				throwable = ExceptionUtils.stripCompletionException(throwable);
+				if (throwable instanceof FlinkJobNotFoundException) {
+					throw new CompletionException(
+						new NotFoundException(String.format("Job %s not found", jobId), throwable));
+				} else {
+					throw new CompletionException(throwable);
+				}
+			});
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 9b59e8d..24650a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -50,7 +52,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
- * Request handler for the job vertex task managers.
+ * A request handler that provides the details of a job vertex, including id, name, and the
+ * runtime and metrics of all its subtasks aggregated by TaskManager.
  */
 public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> {
 	private MetricFetcher<?> metricFetcher;
@@ -65,7 +68,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 			Executor executor,
 			MetricFetcher<?> metricFetcher) {
 		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
-		this.metricFetcher = metricFetcher;
+		this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
 	}
 
 	@Override
@@ -76,23 +79,24 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 		JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
 		AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
 
+		if (jobVertex == null) {
+			throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
+		}
+
 		// Build a map that groups tasks by TaskManager
 		Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
 		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
 			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
-			String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
-			List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager);
-			if (vertices == null) {
-				vertices = new ArrayList<>();
-				taskManagerVertices.put(taskManager, vertices);
-			}
-
+			String taskManager = location == null ? "(unassigned)" : location.getHostname() + ':' + location.dataPort();
+			List<AccessExecutionVertex> vertices = taskManagerVertices.computeIfAbsent(
+				taskManager,
+				ignored -> new ArrayList<>(4));
 			vertices.add(vertex);
 		}
 
 		final long now = System.currentTimeMillis();
 
-		List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>();
+		List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(4);
 		for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
 			String host = entry.getKey();
 			List<AccessExecutionVertex> taskVertices = entry.getValue();
@@ -141,8 +145,10 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 				duration = -1L;
 			}
 
-			ExecutionState jobVertexState =
-				ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size());
+			ExecutionState jobVertexState = ExecutionJobVertex.getAggregateJobVertexState(
+				tasksPerState,
+				taskVertices.size());
+
 			final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
 				counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
 				counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
@@ -153,11 +159,18 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 				counts.getNumRecordsOut(),
 				counts.isNumRecordsOutComplete());
 
-			Map<ExecutionState, Integer> statusCounts = new HashMap<>();
+			Map<ExecutionState, Integer> statusCounts = new HashMap<>(ExecutionState.values().length);
 			for (ExecutionState state : ExecutionState.values()) {
 				statusCounts.put(state, tasksPerState[state.ordinal()]);
 			}
-			taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, endTime, duration, jobVertexMetrics, statusCounts));
+			taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(
+				host,
+				jobVertexState,
+				startTime,
+				endTime,
+				duration,
+				jobVertexMetrics,
+				statusCounts));
 		}
 
 		return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList);

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
index 311d047..8424095 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
@@ -36,6 +36,8 @@ public class JobVertexTaskManagersHeaders implements MessageHeaders<EmptyRequest
 		"/:" + JobVertexIdPathParameter.KEY +
 		"/taskmanagers";
 
+	private JobVertexTaskManagersHeaders() {}
+
 	@Override
 	public Class<EmptyRequestBody> getRequestClass() {
 		return EmptyRequestBody.class;

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
index fc30155..75ff570 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
@@ -30,7 +30,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 
@@ -56,18 +56,18 @@ public class JobVertexTaskManagersInfo implements ResponseBody {
 	private final long now;
 
 	@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS)
-	private List<TaskManagersInfo> taskManagers;
+	private Collection<TaskManagersInfo> taskManagerInfos;
 
 	@JsonCreator
 	public JobVertexTaskManagersInfo(
 			@JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID,
 			@JsonProperty(VERTEX_TASK_FIELD_NAME) String name,
 			@JsonProperty(VERTEX_TASK_FIELD_NOW) long now,
-			@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) List<TaskManagersInfo> taskManagers) {
+			@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) Collection<TaskManagersInfo> taskManagerInfos) {
 		this.jobVertexID = checkNotNull(jobVertexID);
 		this.name = checkNotNull(name);
 		this.now = now;
-		this.taskManagers = checkNotNull(taskManagers);
+		this.taskManagerInfos = checkNotNull(taskManagerInfos);
 	}
 
 	@Override
@@ -82,12 +82,12 @@ public class JobVertexTaskManagersInfo implements ResponseBody {
 		return Objects.equals(jobVertexID, that.jobVertexID) &&
 			Objects.equals(name, that.name) &&
 			now == that.now &&
-			Objects.equals(taskManagers, that.taskManagers);
+			Objects.equals(taskManagerInfos, that.taskManagerInfos);
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(jobVertexID, name, now, taskManagers);
+		return Objects.hash(jobVertexID, name, now, taskManagerInfos);
 	}
 
 	// ---------------------------------------------------