You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/14 16:32:34 UTC

flink git commit: [FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 REST endpoint

Repository: flink
Updated Branches:
  refs/heads/master 3920e9a47 -> dc9a4f2f6


[FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 REST endpoint

This closes #5285.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc9a4f2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc9a4f2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc9a4f2f

Branch: refs/heads/master
Commit: dc9a4f2f63e977f42e2f19a347c7010057dc4c69
Parents: 3920e9a
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Fri Jan 12 16:33:08 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Jan 14 17:27:11 2018 +0100

----------------------------------------------------------------------
 .../job/JobVertexAccumulatorsHandler.java       |   5 +-
 ...taskExecutionAttemptAccumulatorsHandler.java |  85 +++++++++++++
 .../messages/JobVertexAccumulatorsInfo.java     |  53 +-------
 ...taskExecutionAttemptAccumulatorsHeaders.java |  78 ++++++++++++
 ...SubtaskExecutionAttemptAccumulatorsInfo.java |  85 +++++++++++++
 .../rest/messages/job/UserAccumulator.java      |  74 ++++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  13 ++
 ...ExecutionAttemptAccumulatorsHandlerTest.java | 121 +++++++++++++++++++
 .../messages/JobVertexAccumulatorsInfoTest.java |  10 +-
 ...askExecutionAttemptAccumulatorsInfoTest.java |  53 ++++++++
 10 files changed, 519 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
index 52e5632..412cd94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
@@ -65,11 +66,11 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexHandler<JobVe
 			AccessExecutionJobVertex jobVertex) throws RestHandlerException {
 
 		StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
-		ArrayList<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
+		ArrayList<UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
 
 		for (StringifiedAccumulatorResult acc : accs) {
 			userAccumulatorList.add(
-				new JobVertexAccumulatorsInfo.UserAccumulator(
+				new UserAccumulator(
 					acc.getName(),
 					acc.getType(),
 					acc.getValue()));

http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
new file mode 100644
index 0000000..e3b1719
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the subtask execution attempt accumulators.
+ */
+public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters> {
+	/**
+	 * Instantiates a new Abstract job vertex handler.
+	 *
+	 * @param localRestAddress    the local rest address
+	 * @param leaderRetriever     the leader retriever
+	 * @param timeout             the timeout
+	 * @param responseHeaders     the response headers
+	 * @param messageHeaders      the message headers
+	 * @param executionGraphCache the execution graph cache
+	 * @param executor            the executor
+	 */
+	public SubtaskExecutionAttemptAccumulatorsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+	}
+
+	@Override
+	protected SubtaskExecutionAttemptAccumulatorsInfo handleRequest(
+			HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request,
+			AccessExecution execution) throws RestHandlerException {
+
+		final StringifiedAccumulatorResult[] accs = execution.getUserAccumulatorsStringified();
+		final ArrayList<UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
+
+		for (StringifiedAccumulatorResult acc : accs) {
+			userAccumulatorList.add(new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
+		}
+
+		return new SubtaskExecutionAttemptAccumulatorsInfo(
+			execution.getParallelSubtaskIndex(),
+			execution.getAttemptNumber(),
+			execution.getAttemptId().toString(),
+			userAccumulatorList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
index 428829e..fd56e1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -66,56 +67,4 @@ public class JobVertexAccumulatorsInfo implements ResponseBody {
 	public int hashCode() {
 		return Objects.hash(id, userAccumulatorList);
 	}
-
-	//---------------------------------------------------------------------------------
-	// Static helper classes
-	//---------------------------------------------------------------------------------
-
-	/**
-	 * Json serializer for the {@link JobVertexAccumulatorsInfo}.
-	 */
-	public static final class UserAccumulator {
-
-		public static final String FIELD_NAME_ACC_NAME = "name";
-		public static final String FIELD_NAME_ACC_TYPE = "type";
-		public static final String FIELD_NAME_ACC_VALUE = "value";
-
-		@JsonProperty(FIELD_NAME_ACC_NAME)
-		private String name;
-
-		@JsonProperty(FIELD_NAME_ACC_TYPE)
-		private String type;
-
-		@JsonProperty(FIELD_NAME_ACC_VALUE)
-		private String value;
-
-		@JsonCreator
-		public UserAccumulator(
-				@JsonProperty(FIELD_NAME_ACC_NAME) String name,
-				@JsonProperty(FIELD_NAME_ACC_TYPE) String type,
-				@JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
-			this.name = Preconditions.checkNotNull(name);
-			this.type = Preconditions.checkNotNull(type);
-			this.value = Preconditions.checkNotNull(value);
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			UserAccumulator that = (UserAccumulator) o;
-			return Objects.equals(name, that.name) &&
-				Objects.equals(type, that.type) &&
-				Objects.equals(value, that.value);
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(name, type, value);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
new file mode 100644
index 0000000..d162156
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtaskExecutionAttemptAccumulatorsHandler}.
+ */
+public class SubtaskExecutionAttemptAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> {
+
+	private static final SubtaskExecutionAttemptAccumulatorsHeaders INSTANCE = new SubtaskExecutionAttemptAccumulatorsHeaders();
+
+	public static final String URL = String.format(
+		"/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/%s/accumulators",
+		JobIDPathParameter.KEY,
+		JobVertexIdPathParameter.KEY,
+		SubtaskIndexPathParameter.KEY,
+		SubtaskAttemptPathParameter.KEY);
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<SubtaskExecutionAttemptDetailsInfo> getResponseClass() {
+		return SubtaskExecutionAttemptDetailsInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public SubtaskAttemptMessageParameters getUnresolvedMessageParameters() {
+		return new SubtaskAttemptMessageParameters();
+	}
+
+	public static SubtaskExecutionAttemptAccumulatorsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfo.java
new file mode 100644
index 0000000..b6dbe27
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link SubtaskExecutionAttemptAccumulatorsHandler}.
+ */
+public class SubtaskExecutionAttemptAccumulatorsInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_SUBTASK_INDEX = "subtask";
+	public static final String FIELD_NAME_ATTEMPT_NUM = "attempt";
+	public static final String FIELD_NAME_ID = "id";
+	public static final String FIELD_NAME_USER_ACCUMULATORS = "user-accumulators";
+
+	@JsonProperty(FIELD_NAME_SUBTASK_INDEX)
+	private final int subtaskIndex;
+
+	@JsonProperty(FIELD_NAME_ATTEMPT_NUM)
+	private final int attemptNum;
+
+	@JsonProperty(FIELD_NAME_ID)
+	private final String id;
+
+	@JsonProperty(FIELD_NAME_USER_ACCUMULATORS)
+	private final Collection<UserAccumulator> userAccumulatorList;
+
+	@JsonCreator
+	public SubtaskExecutionAttemptAccumulatorsInfo(
+			@JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
+			@JsonProperty(FIELD_NAME_ATTEMPT_NUM) int attemptNum,
+			@JsonProperty(FIELD_NAME_ID) String id,
+			@JsonProperty(FIELD_NAME_USER_ACCUMULATORS) Collection<UserAccumulator> userAccumulatorList) {
+
+		this.subtaskIndex = Preconditions.checkNotNull(subtaskIndex);
+		this.attemptNum = Preconditions.checkNotNull(attemptNum);
+		this.id = Preconditions.checkNotNull(id);
+		this.userAccumulatorList = Preconditions.checkNotNull(userAccumulatorList);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		SubtaskExecutionAttemptAccumulatorsInfo that = (SubtaskExecutionAttemptAccumulatorsInfo) o;
+		return subtaskIndex == that.subtaskIndex &&
+			attemptNum == that.attemptNum &&
+			Objects.equals(id, that.id) &&
+			Objects.equals(userAccumulatorList, that.userAccumulatorList);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(subtaskIndex, attemptNum, id, userAccumulatorList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/UserAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/UserAccumulator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/UserAccumulator.java
new file mode 100644
index 0000000..c47b81f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/UserAccumulator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * User accumulator info.
+ */
+public final class UserAccumulator {
+
+	public static final String FIELD_NAME_ACC_NAME = "name";
+	public static final String FIELD_NAME_ACC_TYPE = "type";
+	public static final String FIELD_NAME_ACC_VALUE = "value";
+
+	@JsonProperty(FIELD_NAME_ACC_NAME)
+	private String name;
+
+	@JsonProperty(FIELD_NAME_ACC_TYPE)
+	private String type;
+
+	@JsonProperty(FIELD_NAME_ACC_VALUE)
+	private String value;
+
+	@JsonCreator
+	public UserAccumulator(
+			@JsonProperty(FIELD_NAME_ACC_NAME) String name,
+			@JsonProperty(FIELD_NAME_ACC_TYPE) String type,
+			@JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
+		this.name = Preconditions.checkNotNull(name);
+		this.type = Preconditions.checkNotNull(type);
+		this.value = Preconditions.checkNotNull(value);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		UserAccumulator that = (UserAccumulator) o;
+		return Objects.equals(name, that.name) &&
+			Objects.equals(type, that.type) &&
+			Objects.equals(value, that.value);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(name, type, value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index faf1ae9..cbad589 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
@@ -75,6 +76,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistic
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
@@ -377,6 +379,16 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			executor,
 			metricFetcher);
 
+		final SubtaskExecutionAttemptAccumulatorsHandler subtaskExecutionAttemptAccumulatorsHandler = new SubtaskExecutionAttemptAccumulatorsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			null,
+			executionGraphCache,
+			executor
+		);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<T>> optWebContent;
@@ -419,6 +431,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(SavepointTriggerHeaders.getInstance(), savepointTriggerHandler));
 		handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), savepointStatusHandler));
 		handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler));
+		handlers.add(Tuple2.of(SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(), subtaskExecutionAttemptAccumulatorsHandler));
 
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(

http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..abc8eec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests of {@link SubtaskExecutionAttemptAccumulatorsHandler}.
+ */
+public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger {
+
+	@Test
+	public void testHandleRequest() throws Exception {
+
+		// Instance the handler.
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
+
+		final SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(
+			CompletableFuture.completedFuture("127.0.0.1:9527"),
+			() -> null,
+			Time.milliseconds(100L),
+			restHandlerConfiguration.getResponseHeaders(),
+			null,
+			new ExecutionGraphCache(
+				restHandlerConfiguration.getTimeout(),
+				Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
+			TestingUtils.defaultExecutor());
+
+		// Instance a empty request.
+		final HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			new SubtaskAttemptMessageParameters()
+		);
+
+		final Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>(2);
+		userAccumulators.put("IntCounter", new IntCounter(10));
+		userAccumulators.put("LongCounter", new LongCounter(100L));
+
+		// Instance the expected result.
+		final StringifiedAccumulatorResult[] accumulatorResults =
+			StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
+
+		final int attemptNum = 1;
+		final int subtaskIndex = 2;
+
+		// Instance the tested execution.
+		final ArchivedExecution execution = new ArchivedExecution(
+			accumulatorResults,
+			null,
+			new ExecutionAttemptID(),
+			attemptNum,
+			ExecutionState.FINISHED,
+			null,
+			null,
+			subtaskIndex,
+			new long[ExecutionState.values().length]);
+
+		// Invoke tested method.
+		final SubtaskExecutionAttemptAccumulatorsInfo accumulatorsInfo = handler.handleRequest(request, execution);
+
+		final ArrayList<UserAccumulator> userAccumulatorList = new ArrayList<>(userAccumulators.size());
+		for (StringifiedAccumulatorResult accumulatorResult : accumulatorResults) {
+			userAccumulatorList.add(
+				new UserAccumulator(
+					accumulatorResult.getName(),
+					accumulatorResult.getType(),
+					accumulatorResult.getValue()));
+		}
+
+		final SubtaskExecutionAttemptAccumulatorsInfo expected = new SubtaskExecutionAttemptAccumulatorsInfo(
+			subtaskIndex,
+			attemptNum,
+			execution.getAttemptId().toString(),
+			userAccumulatorList);
+
+		// Verify.
+		assertEquals(expected, accumulatorsInfo);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
index 21fbcef..bb5c433 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.rest.messages;
 
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -32,16 +34,16 @@ public class JobVertexAccumulatorsInfoTest extends RestResponseMarshallingTestBa
 
 	@Override
 	protected JobVertexAccumulatorsInfo getTestResponseInstance() throws Exception {
-		List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>(3);
-		userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+		List<UserAccumulator> userAccumulatorList = new ArrayList<>(3);
+		userAccumulatorList.add(new UserAccumulator(
 			"test name1",
 			"test type1",
 			"test value1"));
-		userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+		userAccumulatorList.add(new UserAccumulator(
 			"test name2",
 			"test type2",
 			"test value2"));
-		userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+		userAccumulatorList.add(new UserAccumulator(
 			"test name3",
 			"test type3",
 			"test value3"));

http://git-wip-us.apache.org/repos/asf/flink/blob/dc9a4f2f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfoTest.java
new file mode 100644
index 0000000..feea64d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsInfoTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests (un)marshalling of the {@link SubtaskExecutionAttemptAccumulatorsInfo}.
+ */
+public class SubtaskExecutionAttemptAccumulatorsInfoTest extends RestResponseMarshallingTestBase<SubtaskExecutionAttemptAccumulatorsInfo> {
+
+	@Override
+	protected Class<SubtaskExecutionAttemptAccumulatorsInfo> getTestResponseClass() {
+		return SubtaskExecutionAttemptAccumulatorsInfo.class;
+	}
+
+	@Override
+	protected SubtaskExecutionAttemptAccumulatorsInfo getTestResponseInstance() throws Exception {
+
+		final List<UserAccumulator> userAccumulatorList = new ArrayList<>();
+
+		userAccumulatorList.add(new UserAccumulator("name1", "type1", "value1"));
+		userAccumulatorList.add(new UserAccumulator("name2", "type1", "value1"));
+		userAccumulatorList.add(new UserAccumulator("name3", "type2", "value3"));
+
+		return new SubtaskExecutionAttemptAccumulatorsInfo(
+			1,
+			2,
+			new ExecutionAttemptID().toString(),
+			userAccumulatorList
+		);
+	}
+}