You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/25 20:15:19 UTC
[6/6] flink git commit: [FLINK-7858][flip6] Return with HTTP 404 if
job or jobvertex are unknown
[FLINK-7858][flip6] Return with HTTP 404 if job or jobvertex are unknown
Annotate AccessExecutionGraph#getJobVertex(JobVertexID) with @Nullable.
Throw NotFoundException in JobVertexTaskManagersHandler if jobvertexId is unknown.
Throw NotFoundException in AbstractExecutionGraphHandler if jobId is unknown.
Copy Javadoc from legacy JobVertexTaskManagersHandler.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37b4e2ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37b4e2ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37b4e2ce
Branch: refs/heads/master
Commit: 37b4e2cef687160f2bc7cedb7d2360825089569e
Parents: 056c72a
Author: gyao <ga...@data-artisans.com>
Authored: Wed Jan 24 12:24:35 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:55:54 2018 +0100
----------------------------------------------------------------------
.../executiongraph/AccessExecutionGraph.java | 3 +-
.../flink/runtime/rest/NotFoundException.java | 4 ++
.../job/AbstractExecutionGraphHandler.java | 15 ++++++-
.../job/JobVertexTaskManagersHandler.java | 41 +++++++++++++-------
.../messages/JobVertexTaskManagersHeaders.java | 2 +
.../messages/JobVertexTaskManagersInfo.java | 12 +++---
6 files changed, 54 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 362afa1..8d1fa1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -76,8 +76,9 @@ public interface AccessExecutionGraph {
* Returns the job vertex for the given {@link JobVertexID}.
*
* @param id id of job vertex to be returned
- * @return job vertex for the given id, or null
+ * @return job vertex for the given id, or {@code null}
*/
+ @Nullable
AccessExecutionJobVertex getJobVertex(JobVertexID id);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
index 50060b0..f9db334 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
@@ -33,4 +33,8 @@ public class NotFoundException extends RestHandlerException {
public NotFoundException(String message) {
super(message, HttpResponseStatus.NOT_FOUND);
}
+
+ public NotFoundException(String message, Throwable cause) {
+ super(message, HttpResponseStatus.NOT_FOUND, cause);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index 7192832..7c42af1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -32,6 +34,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
@@ -79,8 +82,16 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M ex
} catch (RestHandlerException rhe) {
throw new CompletionException(rhe);
}
- },
- executor);
+ }, executor)
+ .exceptionally(throwable -> {
+ throwable = ExceptionUtils.stripCompletionException(throwable);
+ if (throwable instanceof FlinkJobNotFoundException) {
+ throw new CompletionException(
+ new NotFoundException(String.format("Job %s not found", jobId), throwable));
+ } else {
+ throw new CompletionException(throwable);
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 9b59e8d..24650a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
@@ -50,7 +52,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
- * Request handler for the job vertex task managers.
+ * A request handler that provides the details of a job vertex, including id, name, and the
+ * runtime and metrics of all its subtasks aggregated by TaskManager.
*/
public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> {
private MetricFetcher<?> metricFetcher;
@@ -65,7 +68,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
Executor executor,
MetricFetcher<?> metricFetcher) {
super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
- this.metricFetcher = metricFetcher;
+ this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
}
@Override
@@ -76,23 +79,24 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+ if (jobVertex == null) {
+ throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
+ }
+
// Build a map that groups tasks by TaskManager
Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
- String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
- List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager);
- if (vertices == null) {
- vertices = new ArrayList<>();
- taskManagerVertices.put(taskManager, vertices);
- }
-
+ String taskManager = location == null ? "(unassigned)" : location.getHostname() + ':' + location.dataPort();
+ List<AccessExecutionVertex> vertices = taskManagerVertices.computeIfAbsent(
+ taskManager,
+ ignored -> new ArrayList<>(4));
vertices.add(vertex);
}
final long now = System.currentTimeMillis();
- List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>();
+ List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(4);
for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
String host = entry.getKey();
List<AccessExecutionVertex> taskVertices = entry.getValue();
@@ -141,8 +145,10 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
duration = -1L;
}
- ExecutionState jobVertexState =
- ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size());
+ ExecutionState jobVertexState = ExecutionJobVertex.getAggregateJobVertexState(
+ tasksPerState,
+ taskVertices.size());
+
final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
@@ -153,11 +159,18 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
counts.getNumRecordsOut(),
counts.isNumRecordsOutComplete());
- Map<ExecutionState, Integer> statusCounts = new HashMap<>();
+ Map<ExecutionState, Integer> statusCounts = new HashMap<>(ExecutionState.values().length);
for (ExecutionState state : ExecutionState.values()) {
statusCounts.put(state, tasksPerState[state.ordinal()]);
}
- taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, endTime, duration, jobVertexMetrics, statusCounts));
+ taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(
+ host,
+ jobVertexState,
+ startTime,
+ endTime,
+ duration,
+ jobVertexMetrics,
+ statusCounts));
}
return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList);
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
index 311d047..8424095 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
@@ -36,6 +36,8 @@ public class JobVertexTaskManagersHeaders implements MessageHeaders<EmptyRequest
"/:" + JobVertexIdPathParameter.KEY +
"/taskmanagers";
+ private JobVertexTaskManagersHeaders() {}
+
@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
index fc30155..75ff570 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
@@ -30,7 +30,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import java.util.List;
+import java.util.Collection;
import java.util.Map;
import java.util.Objects;
@@ -56,18 +56,18 @@ public class JobVertexTaskManagersInfo implements ResponseBody {
private final long now;
@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS)
- private List<TaskManagersInfo> taskManagers;
+ private Collection<TaskManagersInfo> taskManagerInfos;
@JsonCreator
public JobVertexTaskManagersInfo(
@JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID,
@JsonProperty(VERTEX_TASK_FIELD_NAME) String name,
@JsonProperty(VERTEX_TASK_FIELD_NOW) long now,
- @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) List<TaskManagersInfo> taskManagers) {
+ @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) Collection<TaskManagersInfo> taskManagerInfos) {
this.jobVertexID = checkNotNull(jobVertexID);
this.name = checkNotNull(name);
this.now = now;
- this.taskManagers = checkNotNull(taskManagers);
+ this.taskManagerInfos = checkNotNull(taskManagerInfos);
}
@Override
@@ -82,12 +82,12 @@ public class JobVertexTaskManagersInfo implements ResponseBody {
return Objects.equals(jobVertexID, that.jobVertexID) &&
Objects.equals(name, that.name) &&
now == that.now &&
- Objects.equals(taskManagers, that.taskManagers);
+ Objects.equals(taskManagerInfos, that.taskManagerInfos);
}
@Override
public int hashCode() {
- return Objects.hash(jobVertexID, name, now, taskManagers);
+ return Objects.hash(jobVertexID, name, now, taskManagerInfos);
}
// ---------------------------------------------------