You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/14 21:52:35 UTC
flink git commit: [FLINK-8367] Migrate
SubtaskCurrentAttemptDetailsHandler to new a REST handler
Repository: flink
Updated Branches:
refs/heads/master dc9a4f2f6 -> de30d1654
[FLINK-8367] Migrate SubtaskCurrentAttemptDetailsHandler to new a REST handler
This closes #5287.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de30d165
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de30d165
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de30d165
Branch: refs/heads/master
Commit: de30d16547d2a0392c2e311934830605de6ebd9e
Parents: dc9a4f2
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Fri Jan 12 21:37:48 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Jan 14 19:16:22 2018 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/IOMetrics.java | 10 +-
.../SubtaskCurrentAttemptDetailsHandler.java | 88 ++++++++++
.../SubtaskExecutionAttemptDetailsHandler.java | 41 +----
.../SubtaskCurrentAttemptDetailsHeaders.java | 77 ++++++++
.../job/SubtaskExecutionAttemptDetailsInfo.java | 39 +++++
.../runtime/webmonitor/WebMonitorEndpoint.java | 14 ++
...SubtaskCurrentAttemptDetailsHandlerTest.java | 174 +++++++++++++++++++
...btaskExecutionAttemptDetailsHandlerTest.java | 132 +++++---------
8 files changed, 448 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index 9da3acd..806de0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -57,11 +57,11 @@ public class IOMetrics implements Serializable {
}
public IOMetrics(
- int numBytesInLocal,
- int numBytesInRemote,
- int numBytesOut,
- int numRecordsIn,
- int numRecordsOut,
+ long numBytesInLocal,
+ long numBytesInRemote,
+ long numBytesOut,
+ long numRecordsIn,
+ long numRecordsOut,
double numBytesInLocalPerSecond,
double numBytesInRemotePerSecond,
double numBytesOutPerSecond,
http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
new file mode 100644
index 0000000..e0f29cf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler providing details about a single task execution attempt.
+ */
+public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> {
+
+ private final MetricFetcher<?> metricFetcher;
+
+ public SubtaskCurrentAttemptDetailsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor,
+ MetricFetcher<?> metricFetcher) {
+
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+
+ this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
+ }
+
+ @Override
+ protected SubtaskExecutionAttemptDetailsInfo handleRequest(
+ HandlerRequest<EmptyRequestBody, SubtaskMessageParameters> request,
+ AccessExecutionVertex executionVertex) throws RestHandlerException {
+
+ final AccessExecution execution = executionVertex.getCurrentExecutionAttempt();
+
+ final MutableIOMetrics ioMetrics = new MutableIOMetrics();
+
+ final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+ final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+
+ ioMetrics.addIOMetrics(
+ execution,
+ metricFetcher,
+ jobID.toString(),
+ jobVertexID.toString()
+ );
+
+ return SubtaskExecutionAttemptDetailsInfo.create(execution, ioMetrics);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index c71b88f..b781ee7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -34,8 +33,6 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
-import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
@@ -82,50 +79,18 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request,
AccessExecution execution) throws RestHandlerException {
- final ExecutionState status = execution.getState();
- final long now = System.currentTimeMillis();
-
- final TaskManagerLocation location = execution.getAssignedResourceLocation();
- final String locationString = location == null ? "(unassigned)" : location.getHostname();
-
- long startTime = execution.getStateTimestamp(ExecutionState.DEPLOYING);
- if (startTime == 0) {
- startTime = -1;
- }
- final long endTime = status.isTerminal() ? execution.getStateTimestamp(status) : -1;
- final long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
-
- final MutableIOMetrics counts = new MutableIOMetrics();
+ final MutableIOMetrics ioMetrics = new MutableIOMetrics();
final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
- counts.addIOMetrics(
+ ioMetrics.addIOMetrics(
execution,
metricFetcher,
jobID.toString(),
jobVertexID.toString()
);
- final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
- counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
- counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
- counts.getNumBytesOut(),
- counts.isNumBytesOutComplete(),
- counts.getNumRecordsIn(),
- counts.isNumRecordsInComplete(),
- counts.getNumRecordsOut(),
- counts.isNumRecordsOutComplete());
-
- return new SubtaskExecutionAttemptDetailsInfo(
- execution.getParallelSubtaskIndex(),
- status,
- execution.getAttemptNumber(),
- locationString,
- startTime,
- endTime,
- duration,
- ioMetricsInfo
- );
+ return SubtaskExecutionAttemptDetailsInfo.create(execution, ioMetrics);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java
new file mode 100644
index 0000000..e64bf3d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtaskCurrentAttemptDetailsHandler}.
+ */
+public class SubtaskCurrentAttemptDetailsHeaders implements MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> {
+
+ private static final SubtaskCurrentAttemptDetailsHeaders INSTANCE = new SubtaskCurrentAttemptDetailsHeaders();
+
+ public static final String URL = String.format(
+ "/jobs/:%s/vertices/:%s/subtasks/:%s",
+ JobIDPathParameter.KEY,
+ JobVertexIdPathParameter.KEY,
+ SubtaskIndexPathParameter.KEY);
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<SubtaskExecutionAttemptDetailsInfo> getResponseClass() {
+ return SubtaskExecutionAttemptDetailsInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public SubtaskMessageParameters getUnresolvedMessageParameters() {
+ return new SubtaskMessageParameters();
+ }
+
+ public static SubtaskCurrentAttemptDetailsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 2dc00e7..8d19b1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -19,8 +19,11 @@
package org.apache.flink.runtime.rest.messages.job;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -151,4 +154,40 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
public int hashCode() {
return Objects.hash(subtaskIndex, status, attempt, host, startTime, endTime, duration, ioMetricsInfo);
}
+
+ public static SubtaskExecutionAttemptDetailsInfo create(AccessExecution execution, MutableIOMetrics ioMetrics) {
+ final ExecutionState status = execution.getState();
+ final long now = System.currentTimeMillis();
+
+ final TaskManagerLocation location = execution.getAssignedResourceLocation();
+ final String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+ long startTime = execution.getStateTimestamp(ExecutionState.DEPLOYING);
+ if (startTime == 0) {
+ startTime = -1;
+ }
+ final long endTime = status.isTerminal() ? execution.getStateTimestamp(status) : -1;
+ final long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
+
+ final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
+ ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(),
+ ioMetrics.isNumBytesInLocalComplete() && ioMetrics.isNumBytesInRemoteComplete(),
+ ioMetrics.getNumBytesOut(),
+ ioMetrics.isNumBytesOutComplete(),
+ ioMetrics.getNumRecordsIn(),
+ ioMetrics.isNumRecordsInComplete(),
+ ioMetrics.getNumRecordsOut(),
+ ioMetrics.isNumRecordsOutComplete());
+
+ return new SubtaskExecutionAttemptDetailsInfo(
+ execution.getParallelSubtaskIndex(),
+ status,
+ execution.getAttemptNumber(),
+ locationString,
+ startTime,
+ endTime,
+ duration,
+ ioMetricsInfo
+ );
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index cbad589..7ef17d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
@@ -76,6 +77,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistic
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
@@ -389,6 +391,17 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
executor
);
+ final SubtaskCurrentAttemptDetailsHandler subtaskCurrentAttemptDetailsHandler = new SubtaskCurrentAttemptDetailsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ null,
+ executionGraphCache,
+ executor,
+ metricFetcher
+ );
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<T>> optWebContent;
@@ -432,6 +445,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), savepointStatusHandler));
handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler));
handlers.add(Tuple2.of(SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(), subtaskExecutionAttemptAccumulatorsHandler));
+ handlers.add(Tuple2.of(SubtaskCurrentAttemptDetailsHeaders.getInstance(), subtaskCurrentAttemptDetailsHandler));
// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
optWebContent.ifPresent(
http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
new file mode 100644
index 0000000..895bdac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests of {@link SubtaskCurrentAttemptDetailsHandler}.
+ */
+public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
+
+ @Test
+ public void testHandleRequest() throws Exception {
+
+ // Prepare the execution graph.
+ final JobID jobID = new JobID();
+ final JobVertexID jobVertexID = new JobVertexID();
+
+ // The testing subtask.
+ final long deployingTs = System.currentTimeMillis() - 1024;
+ final long finishedTs = System.currentTimeMillis();
+
+ final long bytesInLocal = 1L;
+ final long bytesInRemote = 2L;
+ final long bytesOut = 10L;
+ final long recordsIn = 20L;
+ final long recordsOut = 30L;
+
+ final IOMetrics ioMetrics = new IOMetrics(
+ bytesInLocal,
+ bytesInRemote,
+ bytesOut,
+ recordsIn,
+ recordsOut,
+ 0.0,
+ 0.0,
+ 0.0,
+ 0.0,
+ 0.0);
+
+ final long[] timestamps = new long[ExecutionState.values().length];
+ timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs;
+ final ExecutionState expectedState = ExecutionState.FINISHED;
+
+ timestamps[expectedState.ordinal()] = finishedTs;
+
+ final LocalTaskManagerLocation assignedResourceLocation = new LocalTaskManagerLocation();
+
+ final int subtaskIndex = 1;
+ final int attempt = 2;
+ final ArchivedExecution execution = new ArchivedExecution(
+ new StringifiedAccumulatorResult[0],
+ ioMetrics,
+ new ExecutionAttemptID(),
+ attempt,
+ expectedState,
+ null,
+ assignedResourceLocation,
+ subtaskIndex,
+ timestamps);
+
+ final ArchivedExecutionVertex executionVertex = new ArchivedExecutionVertex(
+ subtaskIndex,
+ "Test archived execution vertex",
+ execution,
+ new EvictingBoundedList<>(0));
+
+ // Instance the handler.
+ final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
+
+ final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+ () -> null,
+ path -> null,
+ TestingUtils.defaultExecutor(),
+ Time.milliseconds(1000L));
+
+ final SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(
+ CompletableFuture.completedFuture("127.0.0.1:9527"),
+ () -> null,
+ Time.milliseconds(100),
+ restHandlerConfiguration.getResponseHeaders(),
+ null,
+ new ExecutionGraphCache(
+ restHandlerConfiguration.getTimeout(),
+ Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
+ TestingUtils.defaultExecutor(),
+ metricFetcher);
+
+ final HashMap<String, String> receivedPathParameters = new HashMap<>(2);
+ receivedPathParameters.put(JobIDPathParameter.KEY, jobID.toString());
+ receivedPathParameters.put(JobVertexIdPathParameter.KEY, jobVertexID.toString());
+
+ final HandlerRequest<EmptyRequestBody, SubtaskMessageParameters> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ new SubtaskMessageParameters(),
+ receivedPathParameters,
+ Collections.emptyMap());
+
+ // Handle request.
+ final SubtaskExecutionAttemptDetailsInfo detailsInfo = handler.handleRequest(request, executionVertex);
+
+ // Verify
+ final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
+ bytesInLocal + bytesInRemote,
+ true,
+ bytesOut,
+ true,
+ recordsIn,
+ true,
+ recordsOut,
+ true
+ );
+
+ final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo = new SubtaskExecutionAttemptDetailsInfo(
+ subtaskIndex,
+ expectedState,
+ attempt,
+ assignedResourceLocation.getHostname(),
+ deployingTs,
+ finishedTs,
+ finishedTs - deployingTs,
+ ioMetricsInfo
+ );
+
+ assertEquals(expectedDetailsInfo, detailsInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de30d165/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index e56b0e1..9dbc1de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -18,29 +18,21 @@
package org.apache.flink.runtime.rest.handler.job;
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.metrics.dump.MetricDump;
-import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
-import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -55,8 +47,6 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
@@ -76,59 +66,10 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
// The testing subtask.
final int subtaskIndex = 1;
- final ExecutionState expectedState = ExecutionState.SCHEDULED;
+ final ExecutionState expectedState = ExecutionState.FINISHED;
final int attempt = 0;
final StringifiedAccumulatorResult[] emptyAccumulators = new StringifiedAccumulatorResult[0];
- final ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraph(
- jobID,
- "job name",
- Collections.singletonMap(
- jobVertexId,
- new ArchivedExecutionJobVertex(
- new ArchivedExecutionVertex[]{
- null, // the first subtask won't be queried
- new ArchivedExecutionVertex(
- subtaskIndex,
- "test task",
- new ArchivedExecution(
- emptyAccumulators,
- null,
- new ExecutionAttemptID(),
- attempt,
- expectedState,
- null,
- null,
- subtaskIndex,
- new long[ExecutionState.values().length]),
- new EvictingBoundedList<>(0)
- )
- },
- jobVertexId,
- "test",
- 1,
- 1,
- emptyAccumulators)
- ),
- Collections.emptyList(),
- new long[0],
- JobStatus.FINISHED,
- null,
- "jsonPlan",
- emptyAccumulators,
- Collections.emptyMap(),
- new ArchivedExecutionConfig(new ExecutionConfig()),
- false,
- null,
- null);
-
- // Change some fields so we can make it different from other sub tasks.
- final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
- () -> null,
- path -> null,
- TestingUtils.defaultExecutor(),
- Time.milliseconds(1000L));
- final MetricStore metricStore = metricFetcher.getMetricStore();
final long bytesInLocal = 1L;
final long bytesInRemote = 2L;
@@ -136,25 +77,49 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
final long recordsIn = 20L;
final long recordsOut = 30L;
- Collection<Tuple2<String, Long>> metricValues = Arrays.asList(
- Tuple2.of(MetricNames.IO_NUM_BYTES_IN_LOCAL, bytesInLocal),
- Tuple2.of(MetricNames.IO_NUM_BYTES_IN_REMOTE, bytesInRemote),
- Tuple2.of(MetricNames.IO_NUM_BYTES_OUT, bytesOut),
- Tuple2.of(MetricNames.IO_NUM_RECORDS_IN, recordsIn),
- Tuple2.of(MetricNames.IO_NUM_RECORDS_OUT, recordsOut));
-
- final QueryScopeInfo.TaskQueryScopeInfo queryScopeInfo = new QueryScopeInfo.TaskQueryScopeInfo(
- jobID.toString(),
- jobVertexId.toString(),
- subtaskIndex);
-
- for (Tuple2<String, Long> metricValue : metricValues) {
- metricStore.add(
- new MetricDump.CounterDump(
- queryScopeInfo,
- metricValue.f0,
- metricValue.f1));
- }
+ final IOMetrics ioMetrics = new IOMetrics(
+ bytesInLocal,
+ bytesInRemote,
+ bytesOut,
+ recordsIn,
+ recordsOut,
+ 0.0,
+ 0.0,
+ 0.0,
+ 0.0,
+ 0.0);
+
+ final ArchivedExecutionJobVertex archivedExecutionJobVertex = new ArchivedExecutionJobVertex(
+ new ArchivedExecutionVertex[]{
+ null, // the first subtask won't be queried
+ new ArchivedExecutionVertex(
+ subtaskIndex,
+ "test task",
+ new ArchivedExecution(
+ emptyAccumulators,
+ ioMetrics,
+ new ExecutionAttemptID(),
+ attempt,
+ expectedState,
+ null,
+ null,
+ subtaskIndex,
+ new long[ExecutionState.values().length]),
+ new EvictingBoundedList<>(0)
+ )
+ },
+ jobVertexId,
+ "test",
+ 1,
+ 1,
+ emptyAccumulators);
+
+ // Change some fields so we can make it different from other sub tasks.
+ final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+ () -> null,
+ path -> null,
+ TestingUtils.defaultExecutor(),
+ Time.milliseconds(1000L));
// Instance the handler.
final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
@@ -169,8 +134,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
restHandlerConfiguration.getTimeout(),
Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
TestingUtils.defaultExecutor(),
- metricFetcher
- );
+ metricFetcher);
final HashMap<String, String> receivedPathParameters = new HashMap<>(4);
receivedPathParameters.put(JobIDPathParameter.KEY, jobID.toString());
@@ -188,7 +152,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
// Handle request.
final SubtaskExecutionAttemptDetailsInfo detailsInfo = handler.handleRequest(
request,
- executionGraph.getJobVertex(jobVertexId));
+ archivedExecutionJobVertex);
// Verify
final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
@@ -208,7 +172,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
attempt,
"(unassigned)",
-1L,
- -1L,
+ 0L,
-1L,
ioMetricsInfo
);