You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/14 21:52:35 UTC

flink git commit: [FLINK-8367] Migrate SubtaskCurrentAttemptDetailsHandler to new a REST handler

Repository: flink
Updated Branches:
  refs/heads/master dc9a4f2f6 -> de30d1654


[FLINK-8367] Migrate SubtaskCurrentAttemptDetailsHandler to new a REST handler

This closes #5287.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de30d165
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de30d165
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de30d165

Branch: refs/heads/master
Commit: de30d16547d2a0392c2e311934830605de6ebd9e
Parents: dc9a4f2
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Fri Jan 12 21:37:48 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Jan 14 19:16:22 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/IOMetrics.java |  10 +-
 .../SubtaskCurrentAttemptDetailsHandler.java    |  88 ++++++++++
 .../SubtaskExecutionAttemptDetailsHandler.java  |  41 +----
 .../SubtaskCurrentAttemptDetailsHeaders.java    |  77 ++++++++
 .../job/SubtaskExecutionAttemptDetailsInfo.java |  39 +++++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  14 ++
 ...SubtaskCurrentAttemptDetailsHandlerTest.java | 174 +++++++++++++++++++
 ...btaskExecutionAttemptDetailsHandlerTest.java | 132 +++++---------
 8 files changed, 448 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index 9da3acd..806de0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -57,11 +57,11 @@ public class IOMetrics implements Serializable {
 	}
 
 	public IOMetrics(
-			int numBytesInLocal,
-			int numBytesInRemote,
-			int numBytesOut,
-			int numRecordsIn,
-			int numRecordsOut,
+			long numBytesInLocal,
+			long numBytesInRemote,
+			long numBytesOut,
+			long numRecordsIn,
+			long numRecordsOut,
 			double numBytesInLocalPerSecond,
 			double numBytesInRemotePerSecond,
 			double numBytesOutPerSecond,

http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
new file mode 100644
index 0000000..e0f29cf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler providing details about a single task execution attempt.
+ */
+public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> {
+
+	private final MetricFetcher<?> metricFetcher;
+
+	public SubtaskCurrentAttemptDetailsHandler(
+		CompletableFuture<String> localRestAddress,
+		GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+		Time timeout,
+		Map<String, String> responseHeaders,
+		MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> messageHeaders,
+		ExecutionGraphCache executionGraphCache,
+		Executor executor,
+		MetricFetcher<?> metricFetcher) {
+
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+
+		this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
+	}
+
+	@Override
+	protected SubtaskExecutionAttemptDetailsInfo handleRequest(
+			HandlerRequest<EmptyRequestBody, SubtaskMessageParameters> request,
+			AccessExecutionVertex executionVertex) throws RestHandlerException {
+
+		final AccessExecution execution = executionVertex.getCurrentExecutionAttempt();
+
+		final MutableIOMetrics ioMetrics = new MutableIOMetrics();
+
+		final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+		final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+
+		ioMetrics.addIOMetrics(
+			execution,
+			metricFetcher,
+			jobID.toString(),
+			jobVertexID.toString()
+		);
+
+		return SubtaskExecutionAttemptDetailsInfo.create(execution, ioMetrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index c71b88f..b781ee7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -34,8 +33,6 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
-import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
@@ -82,50 +79,18 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 			HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request,
 			AccessExecution execution) throws RestHandlerException {
 
-		final ExecutionState status = execution.getState();
-		final long now = System.currentTimeMillis();
-
-		final TaskManagerLocation location = execution.getAssignedResourceLocation();
-		final String locationString = location == null ? "(unassigned)" : location.getHostname();
-
-		long startTime = execution.getStateTimestamp(ExecutionState.DEPLOYING);
-		if (startTime == 0) {
-			startTime = -1;
-		}
-		final long endTime = status.isTerminal() ? execution.getStateTimestamp(status) : -1;
-		final long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
-
-		final MutableIOMetrics counts = new MutableIOMetrics();
+		final MutableIOMetrics ioMetrics = new MutableIOMetrics();
 
 		final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
 		final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
 
-		counts.addIOMetrics(
+		ioMetrics.addIOMetrics(
 			execution,
 			metricFetcher,
 			jobID.toString(),
 			jobVertexID.toString()
 		);
 
-		final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
-			counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
-			counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
-			counts.getNumBytesOut(),
-			counts.isNumBytesOutComplete(),
-			counts.getNumRecordsIn(),
-			counts.isNumRecordsInComplete(),
-			counts.getNumRecordsOut(),
-			counts.isNumRecordsOutComplete());
-
-		return new SubtaskExecutionAttemptDetailsInfo(
-			execution.getParallelSubtaskIndex(),
-			status,
-			execution.getAttemptNumber(),
-			locationString,
-			startTime,
-			endTime,
-			duration,
-			ioMetricsInfo
-		);
+		return SubtaskExecutionAttemptDetailsInfo.create(execution, ioMetrics);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java
new file mode 100644
index 0000000..e64bf3d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtaskCurrentAttemptDetailsHandler}.
+ */
+public class SubtaskCurrentAttemptDetailsHeaders implements MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> {
+
+	private static final SubtaskCurrentAttemptDetailsHeaders INSTANCE = new SubtaskCurrentAttemptDetailsHeaders();
+
+	public static final String URL = String.format(
+		"/jobs/:%s/vertices/:%s/subtasks/:%s",
+		JobIDPathParameter.KEY,
+		JobVertexIdPathParameter.KEY,
+		SubtaskIndexPathParameter.KEY);
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<SubtaskExecutionAttemptDetailsInfo> getResponseClass() {
+		return SubtaskExecutionAttemptDetailsInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public SubtaskMessageParameters getUnresolvedMessageParameters() {
+		return new SubtaskMessageParameters();
+	}
+
+	public static SubtaskCurrentAttemptDetailsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 2dc00e7..8d19b1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.rest.messages.job;
 
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -151,4 +154,40 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
 	public int hashCode() {
 		return Objects.hash(subtaskIndex, status, attempt, host, startTime, endTime, duration, ioMetricsInfo);
 	}
+
+	public static SubtaskExecutionAttemptDetailsInfo create(AccessExecution execution, MutableIOMetrics ioMetrics) {
+		final ExecutionState status = execution.getState();
+		final long now = System.currentTimeMillis();
+
+		final TaskManagerLocation location = execution.getAssignedResourceLocation();
+		final String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+		long startTime = execution.getStateTimestamp(ExecutionState.DEPLOYING);
+		if (startTime == 0) {
+			startTime = -1;
+		}
+		final long endTime = status.isTerminal() ? execution.getStateTimestamp(status) : -1;
+		final long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
+
+		final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
+			ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(),
+			ioMetrics.isNumBytesInLocalComplete() && ioMetrics.isNumBytesInRemoteComplete(),
+			ioMetrics.getNumBytesOut(),
+			ioMetrics.isNumBytesOutComplete(),
+			ioMetrics.getNumRecordsIn(),
+			ioMetrics.isNumRecordsInComplete(),
+			ioMetrics.getNumRecordsOut(),
+			ioMetrics.isNumRecordsOutComplete());
+
+		return new SubtaskExecutionAttemptDetailsInfo(
+			execution.getParallelSubtaskIndex(),
+			status,
+			execution.getAttemptNumber(),
+			locationString,
+			startTime,
+			endTime,
+			duration,
+			ioMetricsInfo
+		);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index cbad589..7ef17d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
@@ -76,6 +77,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistic
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
@@ -389,6 +391,17 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			executor
 		);
 
+		final SubtaskCurrentAttemptDetailsHandler subtaskCurrentAttemptDetailsHandler = new SubtaskCurrentAttemptDetailsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			null,
+			executionGraphCache,
+			executor,
+			metricFetcher
+		);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<T>> optWebContent;
@@ -432,6 +445,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), savepointStatusHandler));
 		handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler));
 		handlers.add(Tuple2.of(SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(), subtaskExecutionAttemptAccumulatorsHandler));
+		handlers.add(Tuple2.of(SubtaskCurrentAttemptDetailsHeaders.getInstance(), subtaskCurrentAttemptDetailsHandler));
 
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(

http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
new file mode 100644
index 0000000..895bdac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests of {@link SubtaskCurrentAttemptDetailsHandler}.
+ */
+public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
+
+	@Test
+	public void testHandleRequest() throws Exception {
+
+		// Prepare the execution graph.
+		final JobID jobID = new JobID();
+		final JobVertexID jobVertexID = new JobVertexID();
+
+		// The testing subtask.
+		final long deployingTs = System.currentTimeMillis() - 1024;
+		final long finishedTs = System.currentTimeMillis();
+
+		final long bytesInLocal = 1L;
+		final long bytesInRemote = 2L;
+		final long bytesOut = 10L;
+		final long recordsIn = 20L;
+		final long recordsOut = 30L;
+
+		final IOMetrics ioMetrics = new IOMetrics(
+			bytesInLocal,
+			bytesInRemote,
+			bytesOut,
+			recordsIn,
+			recordsOut,
+			0.0,
+			0.0,
+			0.0,
+			0.0,
+			0.0);
+
+		final long[] timestamps = new long[ExecutionState.values().length];
+		timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs;
+		final ExecutionState expectedState = ExecutionState.FINISHED;
+
+		timestamps[expectedState.ordinal()] = finishedTs;
+
+		final LocalTaskManagerLocation assignedResourceLocation = new LocalTaskManagerLocation();
+
+		final int subtaskIndex = 1;
+		final int attempt = 2;
+		final ArchivedExecution execution = new ArchivedExecution(
+			new StringifiedAccumulatorResult[0],
+			ioMetrics,
+			new ExecutionAttemptID(),
+			attempt,
+			expectedState,
+			null,
+			assignedResourceLocation,
+			subtaskIndex,
+			timestamps);
+
+		final ArchivedExecutionVertex executionVertex = new ArchivedExecutionVertex(
+			subtaskIndex,
+			"Test archived execution vertex",
+			execution,
+			new EvictingBoundedList<>(0));
+
+		// Instance the handler.
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
+
+		final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+			() -> null,
+			path -> null,
+			TestingUtils.defaultExecutor(),
+			Time.milliseconds(1000L));
+
+		final SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(
+			CompletableFuture.completedFuture("127.0.0.1:9527"),
+			() -> null,
+			Time.milliseconds(100),
+			restHandlerConfiguration.getResponseHeaders(),
+			null,
+			new ExecutionGraphCache(
+				restHandlerConfiguration.getTimeout(),
+				Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
+			TestingUtils.defaultExecutor(),
+			metricFetcher);
+
+		final HashMap<String, String> receivedPathParameters = new HashMap<>(2);
+		receivedPathParameters.put(JobIDPathParameter.KEY, jobID.toString());
+		receivedPathParameters.put(JobVertexIdPathParameter.KEY, jobVertexID.toString());
+
+		final HandlerRequest<EmptyRequestBody, SubtaskMessageParameters> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			new SubtaskMessageParameters(),
+			receivedPathParameters,
+			Collections.emptyMap());
+
+		// Handle request.
+		final SubtaskExecutionAttemptDetailsInfo detailsInfo = handler.handleRequest(request, executionVertex);
+
+		// Verify
+		final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
+			bytesInLocal + bytesInRemote,
+			true,
+			bytesOut,
+			true,
+			recordsIn,
+			true,
+			recordsOut,
+			true
+		);
+
+		final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo = new SubtaskExecutionAttemptDetailsInfo(
+			subtaskIndex,
+			expectedState,
+			attempt,
+			assignedResourceLocation.getHostname(),
+			deployingTs,
+			finishedTs,
+			finishedTs - deployingTs,
+			ioMetricsInfo
+		);
+
+		assertEquals(expectedDetailsInfo, detailsInfo);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index e56b0e1..9dbc1de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -18,29 +18,21 @@
 
 package org.apache.flink.runtime.rest.handler.job;
 
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.metrics.dump.MetricDump;
-import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
-import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -55,8 +47,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
@@ -76,59 +66,10 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 
 		// The testing subtask.
 		final int subtaskIndex = 1;
-		final ExecutionState expectedState = ExecutionState.SCHEDULED;
+		final ExecutionState expectedState = ExecutionState.FINISHED;
 		final int attempt = 0;
 
 		final StringifiedAccumulatorResult[] emptyAccumulators = new StringifiedAccumulatorResult[0];
-		final ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraph(
-			jobID,
-			"job name",
-			Collections.singletonMap(
-				jobVertexId,
-				new ArchivedExecutionJobVertex(
-					new ArchivedExecutionVertex[]{
-						null, // the first subtask won't be queried
-						new ArchivedExecutionVertex(
-							subtaskIndex,
-							"test task",
-							new ArchivedExecution(
-								emptyAccumulators,
-								null,
-								new ExecutionAttemptID(),
-								attempt,
-								expectedState,
-								null,
-								null,
-								subtaskIndex,
-								new long[ExecutionState.values().length]),
-							new EvictingBoundedList<>(0)
-						)
-					},
-					jobVertexId,
-					"test",
-					1,
-					1,
-					emptyAccumulators)
-			),
-			Collections.emptyList(),
-			new long[0],
-			JobStatus.FINISHED,
-			null,
-			"jsonPlan",
-			emptyAccumulators,
-			Collections.emptyMap(),
-			new ArchivedExecutionConfig(new ExecutionConfig()),
-			false,
-			null,
-			null);
-
-		// Change some fields so we can make it different from other sub tasks.
-		final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
-			() -> null,
-			path -> null,
-			TestingUtils.defaultExecutor(),
-			Time.milliseconds(1000L));
-		final MetricStore metricStore = metricFetcher.getMetricStore();
 
 		final long bytesInLocal = 1L;
 		final long bytesInRemote = 2L;
@@ -136,25 +77,49 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 		final long recordsIn = 20L;
 		final long recordsOut = 30L;
 
-		Collection<Tuple2<String, Long>> metricValues = Arrays.asList(
-			Tuple2.of(MetricNames.IO_NUM_BYTES_IN_LOCAL, bytesInLocal),
-			Tuple2.of(MetricNames.IO_NUM_BYTES_IN_REMOTE, bytesInRemote),
-			Tuple2.of(MetricNames.IO_NUM_BYTES_OUT, bytesOut),
-			Tuple2.of(MetricNames.IO_NUM_RECORDS_IN, recordsIn),
-			Tuple2.of(MetricNames.IO_NUM_RECORDS_OUT, recordsOut));
-
-		final QueryScopeInfo.TaskQueryScopeInfo queryScopeInfo = new QueryScopeInfo.TaskQueryScopeInfo(
-			jobID.toString(),
-			jobVertexId.toString(),
-			subtaskIndex);
-
-		for (Tuple2<String, Long> metricValue : metricValues) {
-			metricStore.add(
-				new MetricDump.CounterDump(
-					queryScopeInfo,
-					metricValue.f0,
-					metricValue.f1));
-		}
+		final IOMetrics ioMetrics = new IOMetrics(
+			bytesInLocal,
+			bytesInRemote,
+			bytesOut,
+			recordsIn,
+			recordsOut,
+			0.0,
+			0.0,
+			0.0,
+			0.0,
+			0.0);
+
+		final ArchivedExecutionJobVertex archivedExecutionJobVertex = new ArchivedExecutionJobVertex(
+			new ArchivedExecutionVertex[]{
+				null, // the first subtask won't be queried
+				new ArchivedExecutionVertex(
+					subtaskIndex,
+					"test task",
+					new ArchivedExecution(
+						emptyAccumulators,
+						ioMetrics,
+						new ExecutionAttemptID(),
+						attempt,
+						expectedState,
+						null,
+						null,
+						subtaskIndex,
+						new long[ExecutionState.values().length]),
+					new EvictingBoundedList<>(0)
+				)
+			},
+			jobVertexId,
+			"test",
+			1,
+			1,
+			emptyAccumulators);
+
+		// Change some fields so we can make it different from other sub tasks.
+		final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+			() -> null,
+			path -> null,
+			TestingUtils.defaultExecutor(),
+			Time.milliseconds(1000L));
 
 		// Instance the handler.
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
@@ -169,8 +134,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 				restHandlerConfiguration.getTimeout(),
 				Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
 			TestingUtils.defaultExecutor(),
-			metricFetcher
-		);
+			metricFetcher);
 
 		final HashMap<String, String> receivedPathParameters = new HashMap<>(4);
 		receivedPathParameters.put(JobIDPathParameter.KEY, jobID.toString());
@@ -188,7 +152,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 		// Handle request.
 		final SubtaskExecutionAttemptDetailsInfo detailsInfo = handler.handleRequest(
 			request,
-			executionGraph.getJobVertex(jobVertexId));
+			archivedExecutionJobVertex);
 
 		// Verify
 		final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
@@ -208,7 +172,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 			attempt,
 			"(unassigned)",
 			-1L,
-			-1L,
+			0L,
 			-1L,
 			ioMetricsInfo
 		);