You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/25 07:33:37 UTC
[07/10] flink git commit: [FLINK-9212][REST] Port
SubtasksAllAccumulatorsHandler to new REST endpoint
[FLINK-9212][REST] Port SubtasksAllAccumulatorsHandler to new REST endpoint
This closes #5893.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/210abeeb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/210abeeb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/210abeeb
Branch: refs/heads/release-1.5
Commit: 210abeebe8dd915f38b51a68884e9f42321af8cf
Parents: 300dc4c
Author: zhouhai02 <zh...@meituan.com>
Authored: Sun Apr 22 18:59:11 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 25 09:33:21 2018 +0200
----------------------------------------------------------------------
.../job/SubtasksAllAccumulatorsHandler.java | 82 +++++++++++
.../SubtasksAllAccumulatorsHandlers.java | 75 ++++++++++
.../job/SubtasksAllAccumulatorsInfo.java | 144 +++++++++++++++++++
.../runtime/webmonitor/WebMonitorEndpoint.java | 12 ++
.../job/SubtasksAllAccumulatorsInfoTest.java | 58 ++++++++
5 files changed, 371 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
new file mode 100644
index 0000000..51efba2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the subtasks all accumulators.
+ */
+public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexHandler<SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> {
+
+ public SubtasksAllAccumulatorsHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+ }
+
+ @Override
+ protected SubtasksAllAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionJobVertex jobVertex) throws RestHandlerException {
+ JobVertexID jobVertexId = jobVertex.getJobVertexId();
+ int parallelism = jobVertex.getParallelism();
+
+ final List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>();
+
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
+ String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+ StringifiedAccumulatorResult[] accs = vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
+ List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length);
+ for (StringifiedAccumulatorResult acc : accs) {
+ userAccumulators.add(new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
+ }
+
+ subtaskAccumulatorsInfos.add(
+ new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+ vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex(),
+ vertex.getCurrentExecutionAttempt().getAttemptNumber(),
+ locationString,
+ userAccumulators
+ ));
+ }
+
+ return new SubtasksAllAccumulatorsInfo(jobVertexId, parallelism, subtaskAccumulatorsInfos);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
new file mode 100644
index 0000000..e178c93
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtasksAllAccumulatorsHandler}.
+ */
+public class SubtasksAllAccumulatorsHandlers implements MessageHeaders<EmptyRequestBody, SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> {
+
+ private static final SubtasksAllAccumulatorsHandlers INSTANCE = new SubtasksAllAccumulatorsHandlers();
+
+ public static final String URL = "/jobs" +
+ "/:" + JobIDPathParameter.KEY +
+ "/vertices" +
+ "/:" + JobVertexIdPathParameter.KEY +
+ "/subtasks/accumulators";
+
+ private SubtasksAllAccumulatorsHandlers() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<SubtasksAllAccumulatorsInfo> getResponseClass() {
+ return SubtasksAllAccumulatorsInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobVertexMessageParameters getUnresolvedMessageParameters() {
+ return new JobVertexMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static SubtasksAllAccumulatorsHandlers getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
new file mode 100644
index 0000000..ee2535f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link SubtasksAllAccumulatorsHandler}.
+ */
+public class SubtasksAllAccumulatorsInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_JOB_VERTEX_ID = "id";
+ public static final String FIELD_NAME_PARALLELISM = "parallelism";
+ public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
+ @JsonSerialize(using = JobVertexIDSerializer.class)
+ private final JobVertexID jobVertexId;
+
+ @JsonProperty(FIELD_NAME_PARALLELISM)
+ private final int parallelism;
+
+ @JsonProperty(FIELD_NAME_SUBTASKS)
+ private final Collection<SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos;
+
+ @JsonCreator
+ public SubtasksAllAccumulatorsInfo(
+ @JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_VERTEX_ID) JobVertexID jobVertexId,
+ @JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
+ @JsonProperty(FIELD_NAME_SUBTASKS) Collection<SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos) {
+ this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+ this.parallelism = parallelism;
+ this.subtaskAccumulatorsInfos = Preconditions.checkNotNull(subtaskAccumulatorsInfos);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubtasksAllAccumulatorsInfo that = (SubtasksAllAccumulatorsInfo) o;
+ return Objects.equals(jobVertexId, that.jobVertexId) &&
+ parallelism == that.parallelism &&
+ Objects.equals(subtaskAccumulatorsInfos, that.subtaskAccumulatorsInfos);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobVertexId, parallelism, subtaskAccumulatorsInfos);
+ }
+
+ // ---------------------------------------------------
+ // Static inner classes
+ // ---------------------------------------------------
+
+ /**
+ * Detailed information about subtask accumulators.
+ */
+ public static class SubtaskAccumulatorsInfo {
+ public static final String FIELD_NAME_SUBTASK_INDEX = "subtask";
+ public static final String FIELD_NAME_ATTEMPT_NUM = "attempt";
+ public static final String FIELD_NAME_HOST = "host";
+ public static final String FIELD_NAME_USER_ACCUMULATORS = "user-accumulators";
+
+
+ @JsonProperty(FIELD_NAME_SUBTASK_INDEX)
+ private final int subtaskIndex;
+
+ @JsonProperty(FIELD_NAME_ATTEMPT_NUM)
+ private final int attemptNum;
+
+ @JsonProperty(FIELD_NAME_HOST)
+ private final String host;
+
+ @JsonProperty(FIELD_NAME_USER_ACCUMULATORS)
+ private final Collection<UserAccumulator> userAccumulators;
+
+ @JsonCreator
+ public SubtaskAccumulatorsInfo(
+ @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
+ @JsonProperty(FIELD_NAME_ATTEMPT_NUM) int attemptNum,
+ @JsonProperty(FIELD_NAME_HOST) String host,
+ @JsonProperty(FIELD_NAME_USER_ACCUMULATORS) Collection<UserAccumulator> userAccumulators) {
+
+ this.subtaskIndex = subtaskIndex;
+ this.attemptNum = attemptNum;
+ this.host = Preconditions.checkNotNull(host);
+ this.userAccumulators = Preconditions.checkNotNull(userAccumulators);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubtaskAccumulatorsInfo that = (SubtaskAccumulatorsInfo) o;
+ return subtaskIndex == that.subtaskIndex &&
+ attemptNum == that.attemptNum &&
+ Objects.equals(host, that.host) &&
+ Objects.equals(userAccumulators, that.userAccumulators);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtaskIndex, attemptNum, host, userAccumulators);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 0ea7550..1a67d92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
@@ -94,6 +95,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexDetailsHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksAllAccumulatorsHandlers;
import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders;
@@ -318,6 +320,15 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
executionGraphCache,
executor);
+ SubtasksAllAccumulatorsHandler subtasksAllAccumulatorsHandler = new SubtasksAllAccumulatorsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ SubtasksAllAccumulatorsHandlers.getInstance(),
+ executionGraphCache,
+ executor);
+
TaskManagersHandler taskManagersHandler = new TaskManagersHandler(
restAddressFuture,
leaderRetriever,
@@ -575,6 +586,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
+ handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), subtasksAllAccumulatorsHandler));
handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), jobAccumulatorsHandler));
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
new file mode 100644
index 0000000..2a71239
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests (un)marshalling of the {@link SubtasksAllAccumulatorsInfo}.
+ */
+public class SubtasksAllAccumulatorsInfoTest extends RestResponseMarshallingTestBase<SubtasksAllAccumulatorsInfo> {
+ @Override
+ protected Class<SubtasksAllAccumulatorsInfo> getTestResponseClass() {
+ return SubtasksAllAccumulatorsInfo.class;
+ }
+
+ @Override
+ protected SubtasksAllAccumulatorsInfo getTestResponseInstance() throws Exception {
+ List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>(3);
+
+ List<UserAccumulator> userAccumulators = new ArrayList<>(2);
+ userAccumulators.add(new UserAccumulator("test name1", "test type1", "test value1"));
+ userAccumulators.add(new UserAccumulator("test name2", "test type2", "test value2"));
+
+ for (int i = 0; i < 3; ++i) {
+ subtaskAccumulatorsInfos.add(
+ new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+ i,
+ i,
+ "host-" + String.valueOf(i),
+ userAccumulators
+ ));
+
+ }
+ return new SubtasksAllAccumulatorsInfo(new JobVertexID(),
+ 4,
+ subtaskAccumulatorsInfos);
+ }
+}