You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:26 UTC
[28/30] flink git commit: [FLINK-7941][flip6] Port
SubtasksTimesHandler to new REST endpoint
[FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint
This closes #4930.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/712d4cfc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/712d4cfc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/712d4cfc
Branch: refs/heads/master
Commit: 712d4cfc0fb8591fb6ab7d27baede158cd227c60
Parents: 430fa7b
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 3 19:01:45 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:45 2017 +0100
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 11 ++
.../rest/handler/job/SubtasksTimesHandler.java | 105 +++++++++++++
.../rest/messages/SubtasksTimesHeaders.java | 69 +++++++++
.../rest/messages/SubtasksTimesInfo.java | 147 +++++++++++++++++++
.../rest/messages/SubtasksTimesInfoTest.java | 60 ++++++++
5 files changed, 392 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/712d4cfc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 4de77b0..6766784 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
@@ -65,6 +66,7 @@ import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -307,6 +309,15 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executionGraphCache,
executor);
+ SubtasksTimesHandler subtasksTimesHandler = new SubtasksTimesHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ SubtasksTimesHeaders.getInstance(),
+ executionGraphCache,
+ executor);
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
http://git-wip-us.apache.org/repos/asf/flink/blob/712d4cfc/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
new file mode 100644
index 0000000..feae3ab
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the subtasks times info.
+ */
+public class SubtasksTimesHandler extends AbstractExecutionGraphHandler<SubtasksTimesInfo, JobVertexMessageParameters> {
+ public SubtasksTimesHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, SubtasksTimesInfo, JobVertexMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor) {
+ super(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ executionGraphCache,
+ executor);
+ }
+
+ @Override
+ protected SubtasksTimesInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionGraph executionGraph) {
+ JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+ AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+
+ final String id = jobVertex.getJobVertexId().toString();
+ final String name = jobVertex.getName();
+ final long now = System.currentTimeMillis();
+ final List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<>();
+
+ int num = 0;
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+
+ long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
+ ExecutionState status = vertex.getExecutionState();
+
+ long scheduledTime = timestamps[ExecutionState.SCHEDULED.ordinal()];
+
+ long start = scheduledTime > 0 ? scheduledTime : -1;
+ long end = status.isTerminal() ? timestamps[status.ordinal()] : now;
+ long duration = start >= 0 ? end - start : -1L;
+
+ TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
+ String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+ Map<String, Long> timestampMap = new HashMap<>();
+ for (ExecutionState state : ExecutionState.values()) {
+ timestampMap.put(state.name(), timestamps[state.ordinal()]);
+ }
+
+ subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(
+ num++,
+ locationString,
+ duration,
+ timestampMap));
+ }
+ return new SubtasksTimesInfo(id, name, now, subtasks);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/712d4cfc/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java
new file mode 100644
index 0000000..81056da
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtasksTimesHandler}.
+ */
+public class SubtasksTimesHeaders implements MessageHeaders<EmptyRequestBody, SubtasksTimesInfo, JobVertexMessageParameters> {
+
+ private static final SubtasksTimesHeaders INSTANCE = new SubtasksTimesHeaders();
+
+ public static final String URL = "/jobs/:" + JobIDPathParameter.KEY +
+ "/vertices/:" + JobVertexIdPathParameter.KEY + "/subtasktimes";
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<SubtasksTimesInfo> getResponseClass() {
+ return SubtasksTimesInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobVertexMessageParameters getUnresolvedMessageParameters() {
+ return new JobVertexMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static SubtasksTimesHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/712d4cfc/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
new file mode 100644
index 0000000..d97a0d0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Response type of the {@link SubtasksTimesHandler}.
+ */
+public class SubtasksTimesInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_ID = "id";
+ public static final String FIELD_NAME_NAME = "name";
+ public static final String FIELD_NAME_NOW = "now";
+ public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+ @JsonProperty(FIELD_NAME_ID)
+ private final String id;
+
+ @JsonProperty(FIELD_NAME_NAME)
+ private final String name;
+
+ @JsonProperty(FIELD_NAME_NOW)
+ private final long now;
+
+ @JsonProperty(FIELD_NAME_SUBTASKS)
+ private final List<SubtaskTimeInfo> subtasks;
+
+ @JsonCreator
+ public SubtasksTimesInfo(
+ @JsonProperty(FIELD_NAME_ID) String id,
+ @JsonProperty(FIELD_NAME_NAME) String name,
+ @JsonProperty(FIELD_NAME_NOW) long now,
+ @JsonProperty(FIELD_NAME_SUBTASKS) List<SubtaskTimeInfo> subtasks) {
+ this.id = checkNotNull(id);
+ this.name = checkNotNull(name);
+ this.now = now;
+ this.subtasks = checkNotNull(subtasks);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || this.getClass() != o.getClass()) {
+ return false;
+ }
+
+ SubtasksTimesInfo that = (SubtasksTimesInfo) o;
+ return Objects.equals(id, that.id) &&
+ Objects.equals(name, that.name) &&
+ now == that.now &&
+ Objects.equals(subtasks, that.subtasks);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name, now, subtasks);
+ }
+
+ //---------------------------------------------------------------------------------
+ // Static helper classes
+ //---------------------------------------------------------------------------------
+
+ /**
+ * Nested class to encapsulate the sub task times info.
+ */
+ public static final class SubtaskTimeInfo {
+
+ public static final String FIELD_NAME_SUBTASK = "subtask";
+ public static final String FIELD_NAME_HOST = "host";
+ public static final String FIELD_NAME_DURATION = "duration";
+ public static final String FIELD_NAME_TIMESTAMPS = "timestamps";
+
+ @JsonProperty(FIELD_NAME_SUBTASK)
+ private final int subtask;
+
+ @JsonProperty(FIELD_NAME_HOST)
+ private final String host;
+
+ @JsonProperty(FIELD_NAME_DURATION)
+ private final long duration;
+
+ @JsonProperty(FIELD_NAME_TIMESTAMPS)
+ private final Map<String, Long> timestamps;
+
+ public SubtaskTimeInfo(
+ @JsonProperty(FIELD_NAME_SUBTASK) int subtask,
+ @JsonProperty(FIELD_NAME_HOST) String host,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_TIMESTAMPS) Map<String, Long> timestamps) {
+ this.subtask = subtask;
+ this.host = checkNotNull(host);
+ this.duration = duration;
+ this.timestamps = checkNotNull(timestamps);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (null == o || this.getClass() != o.getClass()) {
+ return false;
+ }
+
+ SubtaskTimeInfo that = (SubtaskTimeInfo) o;
+ return subtask == that.subtask &&
+ Objects.equals(host, that.host) &&
+ duration == that.duration &&
+ Objects.equals(timestamps, that.timestamps);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtask, host, duration, timestamps);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/712d4cfc/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
new file mode 100644
index 0000000..82eb21b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests that the {@link SubtasksTimesInfo} can be marshalled and unmarshalled.
+ */
+public class SubtasksTimesInfoTest extends RestResponseMarshallingTestBase<SubtasksTimesInfo> {
+
+ @Override
+ protected Class<SubtasksTimesInfo> getTestResponseClass() {
+ return SubtasksTimesInfo.class;
+ }
+
+ @Override
+ protected SubtasksTimesInfo getTestResponseInstance() throws Exception {
+ List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<>();
+
+ Map<String, Long> subTimeMap1 = new HashMap<>();
+ subTimeMap1.put("state11", System.currentTimeMillis());
+ subTimeMap1.put("state12", System.currentTimeMillis());
+ subTimeMap1.put("state13", System.currentTimeMillis());
+ subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(0, "local1", 1L, subTimeMap1));
+
+ Map<String, Long> subTimeMap2 = new HashMap<>();
+ subTimeMap1.put("state21", System.currentTimeMillis());
+ subTimeMap1.put("state22", System.currentTimeMillis());
+ subTimeMap1.put("state23", System.currentTimeMillis());
+ subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(1, "local2", 2L, subTimeMap2));
+
+ Map<String, Long> subTimeMap3 = new HashMap<>();
+ subTimeMap1.put("state31", System.currentTimeMillis());
+ subTimeMap1.put("state32", System.currentTimeMillis());
+ subTimeMap1.put("state33", System.currentTimeMillis());
+ subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(2, "local3", 3L, subTimeMap3));
+
+ return new SubtasksTimesInfo("testId", "testName", System.currentTimeMillis(), subtasks);
+ }
+}