You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:26 UTC

[28/30] flink git commit: [FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint

[FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint

This closes #4930.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/712d4cfc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/712d4cfc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/712d4cfc

Branch: refs/heads/master
Commit: 712d4cfc0fb8591fb6ab7d27baede158cd227c60
Parents: 430fa7b
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 3 19:01:45 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:45 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  11 ++
 .../rest/handler/job/SubtasksTimesHandler.java  | 105 +++++++++++++
 .../rest/messages/SubtasksTimesHeaders.java     |  69 +++++++++
 .../rest/messages/SubtasksTimesInfo.java        | 147 +++++++++++++++++++
 .../rest/messages/SubtasksTimesInfoTest.java    |  60 ++++++++
 5 files changed, 392 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/712d4cfc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 4de77b0..6766784 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
@@ -65,6 +66,7 @@ import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -307,6 +309,15 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			executionGraphCache,
 			executor);
 
+		SubtasksTimesHandler subtasksTimesHandler = new SubtasksTimesHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			SubtasksTimesHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;

http://git-wip-us.apache.org/repos/asf/flink/blob/712d4cfc/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
new file mode 100644
index 0000000..feae3ab
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the subtasks times info.
+ */
+public class SubtasksTimesHandler extends AbstractExecutionGraphHandler<SubtasksTimesInfo, JobVertexMessageParameters>  {
+	public SubtasksTimesHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, SubtasksTimesInfo, JobVertexMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			messageHeaders,
+			executionGraphCache,
+			executor);
+	}
+
+	@Override
+	protected SubtasksTimesInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionGraph executionGraph) {
+		JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+		AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+
+		final String id = jobVertex.getJobVertexId().toString();
+		final String name = jobVertex.getName();
+		final long now = System.currentTimeMillis();
+		final List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<>();
+
+		int num = 0;
+		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+
+			long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
+			ExecutionState status = vertex.getExecutionState();
+
+			long scheduledTime = timestamps[ExecutionState.SCHEDULED.ordinal()];
+
+			long start = scheduledTime > 0 ? scheduledTime : -1;
+			long end = status.isTerminal() ? timestamps[status.ordinal()] : now;
+			long duration = start >= 0 ? end - start : -1L;
+
+			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
+			String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+			Map<String, Long> timestampMap = new HashMap<>();
+			for (ExecutionState state : ExecutionState.values()) {
+				timestampMap.put(state.name(), timestamps[state.ordinal()]);
+			}
+
+			subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(
+				num++,
+				locationString,
+				duration,
+				timestampMap));
+		}
+		return new SubtasksTimesInfo(id, name, now, subtasks);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/712d4cfc/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java
new file mode 100644
index 0000000..81056da
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtasksTimesHandler}.
+ */
+public class SubtasksTimesHeaders implements MessageHeaders<EmptyRequestBody, SubtasksTimesInfo, JobVertexMessageParameters> {
+
+	private static final SubtasksTimesHeaders INSTANCE = new SubtasksTimesHeaders();
+
+	public static final String URL = "/jobs/:" + JobIDPathParameter.KEY +
+		"/vertices/:" + JobVertexIdPathParameter.KEY + "/subtasktimes";
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<SubtasksTimesInfo> getResponseClass() {
+		return SubtasksTimesInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobVertexMessageParameters getUnresolvedMessageParameters() {
+		return new JobVertexMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static SubtasksTimesHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/712d4cfc/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
new file mode 100644
index 0000000..d97a0d0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Response type of the {@link SubtasksTimesHandler}.
+ */
+public class SubtasksTimesInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_ID = "id";
+	public static final String FIELD_NAME_NAME = "name";
+	public static final String FIELD_NAME_NOW = "now";
+	public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+	@JsonProperty(FIELD_NAME_ID)
+	private final String id;
+
+	@JsonProperty(FIELD_NAME_NAME)
+	private final String name;
+
+	@JsonProperty(FIELD_NAME_NOW)
+	private final long now;
+
+	@JsonProperty(FIELD_NAME_SUBTASKS)
+	private final List<SubtaskTimeInfo> subtasks;
+
+	@JsonCreator
+	public SubtasksTimesInfo(
+			@JsonProperty(FIELD_NAME_ID) String id,
+			@JsonProperty(FIELD_NAME_NAME) String name,
+			@JsonProperty(FIELD_NAME_NOW) long now,
+			@JsonProperty(FIELD_NAME_SUBTASKS) List<SubtaskTimeInfo> subtasks) {
+		this.id = checkNotNull(id);
+		this.name = checkNotNull(name);
+		this.now = now;
+		this.subtasks = checkNotNull(subtasks);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || this.getClass() != o.getClass()) {
+			return false;
+		}
+
+		SubtasksTimesInfo that = (SubtasksTimesInfo) o;
+		return Objects.equals(id, that.id) &&
+			Objects.equals(name, that.name) &&
+			now == that.now &&
+			Objects.equals(subtasks, that.subtasks);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(id, name, now, subtasks);
+	}
+
+	//---------------------------------------------------------------------------------
+	// Static helper classes
+	//---------------------------------------------------------------------------------
+
+	/**
+	 * Nested class to encapsulate the sub task times info.
+	 */
+	public static final class SubtaskTimeInfo {
+
+		public static final String FIELD_NAME_SUBTASK = "subtask";
+		public static final String FIELD_NAME_HOST = "host";
+		public static final String FIELD_NAME_DURATION = "duration";
+		public static final String FIELD_NAME_TIMESTAMPS = "timestamps";
+
+		@JsonProperty(FIELD_NAME_SUBTASK)
+		private final int subtask;
+
+		@JsonProperty(FIELD_NAME_HOST)
+		private final String host;
+
+		@JsonProperty(FIELD_NAME_DURATION)
+		private final long duration;
+
+		@JsonProperty(FIELD_NAME_TIMESTAMPS)
+		private final Map<String, Long> timestamps;
+
+		public SubtaskTimeInfo(
+				@JsonProperty(FIELD_NAME_SUBTASK) int subtask,
+				@JsonProperty(FIELD_NAME_HOST) String host,
+				@JsonProperty(FIELD_NAME_DURATION) long duration,
+				@JsonProperty(FIELD_NAME_TIMESTAMPS) Map<String, Long> timestamps) {
+			this.subtask = subtask;
+			this.host = checkNotNull(host);
+			this.duration = duration;
+			this.timestamps = checkNotNull(timestamps);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+
+			if (null == o || this.getClass() != o.getClass()) {
+				return false;
+			}
+
+			SubtaskTimeInfo that = (SubtaskTimeInfo) o;
+			return subtask == that.subtask &&
+				Objects.equals(host, that.host) &&
+				duration == that.duration &&
+				Objects.equals(timestamps, that.timestamps);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(subtask, host, duration, timestamps);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/712d4cfc/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
new file mode 100644
index 0000000..82eb21b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests that the {@link SubtasksTimesInfo} can be marshalled and unmarshalled.
+ */
+public class SubtasksTimesInfoTest extends RestResponseMarshallingTestBase<SubtasksTimesInfo> {
+
+	@Override
+	protected Class<SubtasksTimesInfo> getTestResponseClass() {
+		return SubtasksTimesInfo.class;
+	}
+
+	@Override
+	protected SubtasksTimesInfo getTestResponseInstance() throws Exception {
+		List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<>();
+
+		Map<String, Long> subTimeMap1 = new HashMap<>();
+		subTimeMap1.put("state11", System.currentTimeMillis());
+		subTimeMap1.put("state12", System.currentTimeMillis());
+		subTimeMap1.put("state13", System.currentTimeMillis());
+		subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(0, "local1", 1L, subTimeMap1));
+
+		Map<String, Long> subTimeMap2 = new HashMap<>();
+		subTimeMap1.put("state21", System.currentTimeMillis());
+		subTimeMap1.put("state22", System.currentTimeMillis());
+		subTimeMap1.put("state23", System.currentTimeMillis());
+		subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(1, "local2", 2L, subTimeMap2));
+
+		Map<String, Long> subTimeMap3 = new HashMap<>();
+		subTimeMap1.put("state31", System.currentTimeMillis());
+		subTimeMap1.put("state32", System.currentTimeMillis());
+		subTimeMap1.put("state33", System.currentTimeMillis());
+		subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(2, "local3", 3L, subTimeMap3));
+
+		return new SubtasksTimesInfo("testId", "testName", System.currentTimeMillis(), subtasks);
+	}
+}