You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/14 16:32:34 UTC
flink git commit: [FLINK-8369] Migrate
SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 REST endpoint
Repository: flink
Updated Branches:
refs/heads/master 3920e9a47 -> dc9a4f2f6
[FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 REST endpoint
This closes #5285.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc9a4f2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc9a4f2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc9a4f2f
Branch: refs/heads/master
Commit: dc9a4f2f63e977f42e2f19a347c7010057dc4c69
Parents: 3920e9a
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Fri Jan 12 16:33:08 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Jan 14 17:27:11 2018 +0100
----------------------------------------------------------------------
.../job/JobVertexAccumulatorsHandler.java | 5 +-
...taskExecutionAttemptAccumulatorsHandler.java | 85 +++++++++++++
.../messages/JobVertexAccumulatorsInfo.java | 53 +-------
...taskExecutionAttemptAccumulatorsHeaders.java | 78 ++++++++++++
...SubtaskExecutionAttemptAccumulatorsInfo.java | 85 +++++++++++++
.../rest/messages/job/UserAccumulator.java | 74 ++++++++++++
.../runtime/webmonitor/WebMonitorEndpoint.java | 13 ++
...ExecutionAttemptAccumulatorsHandlerTest.java | 121 +++++++++++++++++++
.../messages/JobVertexAccumulatorsInfoTest.java | 10 +-
...askExecutionAttemptAccumulatorsInfoTest.java | 53 ++++++++
10 files changed, 519 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
index 52e5632..412cd94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -65,11 +66,11 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexHandler<JobVe
AccessExecutionJobVertex jobVertex) throws RestHandlerException {
StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
- ArrayList<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
+ ArrayList<UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
for (StringifiedAccumulatorResult acc : accs) {
userAccumulatorList.add(
- new JobVertexAccumulatorsInfo.UserAccumulator(
+ new UserAccumulator(
acc.getName(),
acc.getType(),
acc.getValue()));
http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
new file mode 100644
index 0000000..e3b1719
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the subtask execution attempt accumulators.
+ */
+public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters> {
+ /**
+ * Instantiates a new Abstract job vertex handler.
+ *
+ * @param localRestAddress the local rest address
+ * @param leaderRetriever the leader retriever
+ * @param timeout the timeout
+ * @param responseHeaders the response headers
+ * @param messageHeaders the message headers
+ * @param executionGraphCache the execution graph cache
+ * @param executor the executor
+ */
+ public SubtaskExecutionAttemptAccumulatorsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor) {
+
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+ }
+
+ @Override
+ protected SubtaskExecutionAttemptAccumulatorsInfo handleRequest(
+ HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request,
+ AccessExecution execution) throws RestHandlerException {
+
+ final StringifiedAccumulatorResult[] accs = execution.getUserAccumulatorsStringified();
+ final ArrayList<UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
+
+ for (StringifiedAccumulatorResult acc : accs) {
+ userAccumulatorList.add(new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
+ }
+
+ return new SubtaskExecutionAttemptAccumulatorsInfo(
+ execution.getParallelSubtaskIndex(),
+ execution.getAttemptNumber(),
+ execution.getAttemptId().toString(),
+ userAccumulatorList);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
index 428829e..fd56e1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.messages;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -66,56 +67,4 @@ public class JobVertexAccumulatorsInfo implements ResponseBody {
public int hashCode() {
return Objects.hash(id, userAccumulatorList);
}
-
- //---------------------------------------------------------------------------------
- // Static helper classes
- //---------------------------------------------------------------------------------
-
- /**
- * Json serializer for the {@link JobVertexAccumulatorsInfo}.
- */
- public static final class UserAccumulator {
-
- public static final String FIELD_NAME_ACC_NAME = "name";
- public static final String FIELD_NAME_ACC_TYPE = "type";
- public static final String FIELD_NAME_ACC_VALUE = "value";
-
- @JsonProperty(FIELD_NAME_ACC_NAME)
- private String name;
-
- @JsonProperty(FIELD_NAME_ACC_TYPE)
- private String type;
-
- @JsonProperty(FIELD_NAME_ACC_VALUE)
- private String value;
-
- @JsonCreator
- public UserAccumulator(
- @JsonProperty(FIELD_NAME_ACC_NAME) String name,
- @JsonProperty(FIELD_NAME_ACC_TYPE) String type,
- @JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
- this.name = Preconditions.checkNotNull(name);
- this.type = Preconditions.checkNotNull(type);
- this.value = Preconditions.checkNotNull(value);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- UserAccumulator that = (UserAccumulator) o;
- return Objects.equals(name, that.name) &&
- Objects.equals(type, that.type) &&
- Objects.equals(value, that.value);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, type, value);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
new file mode 100644
index 0000000..d162156
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtaskExecutionAttemptAccumulatorsHandler}.
+ */
+public class SubtaskExecutionAttemptAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> {
+
+ private static final SubtaskExecutionAttemptAccumulatorsHeaders INSTANCE = new SubtaskExecutionAttemptAccumulatorsHeaders();
+
+ public static final String URL = String.format(
+ "/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/%s/accumulators",
+ JobIDPathParameter.KEY,
+ JobVertexIdPathParameter.KEY,
+ SubtaskIndexPathParameter.KEY,
+ SubtaskAttemptPathParameter.KEY);
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<SubtaskExecutionAttemptDetailsInfo> getResponseClass() {
+ return SubtaskExecutionAttemptDetailsInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public SubtaskAttemptMessageParameters getUnresolvedMessageParameters() {
+ return new SubtaskAttemptMessageParameters();
+ }
+
+ public static SubtaskExecutionAttemptAccumulatorsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfo.java
new file mode 100644
index 0000000..b6dbe27
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link SubtaskExecutionAttemptAccumulatorsHandler}.
+ */
+public class SubtaskExecutionAttemptAccumulatorsInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_SUBTASK_INDEX = "subtask";
+ public static final String FIELD_NAME_ATTEMPT_NUM = "attempt";
+ public static final String FIELD_NAME_ID = "id";
+ public static final String FIELD_NAME_USER_ACCUMULATORS = "user-accumulators";
+
+ @JsonProperty(FIELD_NAME_SUBTASK_INDEX)
+ private final int subtaskIndex;
+
+ @JsonProperty(FIELD_NAME_ATTEMPT_NUM)
+ private final int attemptNum;
+
+ @JsonProperty(FIELD_NAME_ID)
+ private final String id;
+
+ @JsonProperty(FIELD_NAME_USER_ACCUMULATORS)
+ private final Collection<UserAccumulator> userAccumulatorList;
+
+ @JsonCreator
+ public SubtaskExecutionAttemptAccumulatorsInfo(
+ @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
+ @JsonProperty(FIELD_NAME_ATTEMPT_NUM) int attemptNum,
+ @JsonProperty(FIELD_NAME_ID) String id,
+ @JsonProperty(FIELD_NAME_USER_ACCUMULATORS) Collection<UserAccumulator> userAccumulatorList) {
+
+ this.subtaskIndex = Preconditions.checkNotNull(subtaskIndex);
+ this.attemptNum = Preconditions.checkNotNull(attemptNum);
+ this.id = Preconditions.checkNotNull(id);
+ this.userAccumulatorList = Preconditions.checkNotNull(userAccumulatorList);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubtaskExecutionAttemptAccumulatorsInfo that = (SubtaskExecutionAttemptAccumulatorsInfo) o;
+ return subtaskIndex == that.subtaskIndex &&
+ attemptNum == that.attemptNum &&
+ Objects.equals(id, that.id) &&
+ Objects.equals(userAccumulatorList, that.userAccumulatorList);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtaskIndex, attemptNum, id, userAccumulatorList);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/UserAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/UserAccumulator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/UserAccumulator.java
new file mode 100644
index 0000000..c47b81f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/UserAccumulator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * User accumulator info.
+ */
+public final class UserAccumulator {
+
+ public static final String FIELD_NAME_ACC_NAME = "name";
+ public static final String FIELD_NAME_ACC_TYPE = "type";
+ public static final String FIELD_NAME_ACC_VALUE = "value";
+
+ @JsonProperty(FIELD_NAME_ACC_NAME)
+ private String name;
+
+ @JsonProperty(FIELD_NAME_ACC_TYPE)
+ private String type;
+
+ @JsonProperty(FIELD_NAME_ACC_VALUE)
+ private String value;
+
+ @JsonCreator
+ public UserAccumulator(
+ @JsonProperty(FIELD_NAME_ACC_NAME) String name,
+ @JsonProperty(FIELD_NAME_ACC_TYPE) String type,
+ @JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
+ this.name = Preconditions.checkNotNull(name);
+ this.type = Preconditions.checkNotNull(type);
+ this.value = Preconditions.checkNotNull(value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UserAccumulator that = (UserAccumulator) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(type, that.type) &&
+ Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index faf1ae9..cbad589 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
@@ -75,6 +76,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistic
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
@@ -377,6 +379,16 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
executor,
metricFetcher);
+ final SubtaskExecutionAttemptAccumulatorsHandler subtaskExecutionAttemptAccumulatorsHandler = new SubtaskExecutionAttemptAccumulatorsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ null,
+ executionGraphCache,
+ executor
+ );
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<T>> optWebContent;
@@ -419,6 +431,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
handlers.add(Tuple2.of(SavepointTriggerHeaders.getInstance(), savepointTriggerHandler));
handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), savepointStatusHandler));
handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler));
+ handlers.add(Tuple2.of(SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(), subtaskExecutionAttemptAccumulatorsHandler));
// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
optWebContent.ifPresent(
http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..abc8eec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests of {@link SubtaskExecutionAttemptAccumulatorsHandler}.
+ */
+public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger {
+
+ @Test
+ public void testHandleRequest() throws Exception {
+
+ // Instance the handler.
+ final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
+
+ final SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(
+ CompletableFuture.completedFuture("127.0.0.1:9527"),
+ () -> null,
+ Time.milliseconds(100L),
+ restHandlerConfiguration.getResponseHeaders(),
+ null,
+ new ExecutionGraphCache(
+ restHandlerConfiguration.getTimeout(),
+ Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
+ TestingUtils.defaultExecutor());
+
+ // Instance a empty request.
+ final HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ new SubtaskAttemptMessageParameters()
+ );
+
+ final Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>(2);
+ userAccumulators.put("IntCounter", new IntCounter(10));
+ userAccumulators.put("LongCounter", new LongCounter(100L));
+
+ // Instance the expected result.
+ final StringifiedAccumulatorResult[] accumulatorResults =
+ StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
+
+ final int attemptNum = 1;
+ final int subtaskIndex = 2;
+
+ // Instance the tested execution.
+ final ArchivedExecution execution = new ArchivedExecution(
+ accumulatorResults,
+ null,
+ new ExecutionAttemptID(),
+ attemptNum,
+ ExecutionState.FINISHED,
+ null,
+ null,
+ subtaskIndex,
+ new long[ExecutionState.values().length]);
+
+ // Invoke tested method.
+ final SubtaskExecutionAttemptAccumulatorsInfo accumulatorsInfo = handler.handleRequest(request, execution);
+
+ final ArrayList<UserAccumulator> userAccumulatorList = new ArrayList<>(userAccumulators.size());
+ for (StringifiedAccumulatorResult accumulatorResult : accumulatorResults) {
+ userAccumulatorList.add(
+ new UserAccumulator(
+ accumulatorResult.getName(),
+ accumulatorResult.getType(),
+ accumulatorResult.getValue()));
+ }
+
+ final SubtaskExecutionAttemptAccumulatorsInfo expected = new SubtaskExecutionAttemptAccumulatorsInfo(
+ subtaskIndex,
+ attemptNum,
+ execution.getAttemptId().toString(),
+ userAccumulatorList);
+
+ // Verify.
+ assertEquals(expected, accumulatorsInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
index 21fbcef..bb5c433 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.rest.messages;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
+
import java.util.ArrayList;
import java.util.List;
@@ -32,16 +34,16 @@ public class JobVertexAccumulatorsInfoTest extends RestResponseMarshallingTestBa
@Override
protected JobVertexAccumulatorsInfo getTestResponseInstance() throws Exception {
- List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>(3);
- userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+ List<UserAccumulator> userAccumulatorList = new ArrayList<>(3);
+ userAccumulatorList.add(new UserAccumulator(
"test name1",
"test type1",
"test value1"));
- userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+ userAccumulatorList.add(new UserAccumulator(
"test name2",
"test type2",
"test value2"));
- userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+ userAccumulatorList.add(new UserAccumulator(
"test name3",
"test type3",
"test value3"));
http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfoTest.java
new file mode 100644
index 0000000..feea64d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfoTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests (un)marshalling of the {@link SubtaskExecutionAttemptAccumulatorsInfo}.
+ */
+public class SubtaskExecutionAttemptAccumulatorsInfoTest extends RestResponseMarshallingTestBase<SubtaskExecutionAttemptAccumulatorsInfo> {
+
+ @Override
+ protected Class<SubtaskExecutionAttemptAccumulatorsInfo> getTestResponseClass() {
+ return SubtaskExecutionAttemptAccumulatorsInfo.class;
+ }
+
+ @Override
+ protected SubtaskExecutionAttemptAccumulatorsInfo getTestResponseInstance() throws Exception {
+
+ final List<UserAccumulator> userAccumulatorList = new ArrayList<>();
+
+ userAccumulatorList.add(new UserAccumulator("name1", "type1", "value1"));
+ userAccumulatorList.add(new UserAccumulator("name2", "type1", "value1"));
+ userAccumulatorList.add(new UserAccumulator("name3", "type2", "value3"));
+
+ return new SubtaskExecutionAttemptAccumulatorsInfo(
+ 1,
+ 2,
+ new ExecutionAttemptID().toString(),
+ userAccumulatorList
+ );
+ }
+}