You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/20 13:10:10 UTC

[1/2] flink git commit: [FLINK-7855] [flip6] Add JobVertexMessageParameters

Repository: flink
Updated Branches:
  refs/heads/master 142ff78c3 -> ebc3bc1f9


[FLINK-7855] [flip6] Add JobVertexMessageParameters

This closes #4857.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ebc3bc1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ebc3bc1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ebc3bc1f

Branch: refs/heads/master
Commit: ebc3bc1f9ce9dcbd69fedc2ec79ab03d94d99cef
Parents: 1348839
Author: Till <ti...@gmail.com>
Authored: Fri Oct 20 15:05:11 2017 +0200
Committer: Till <ti...@gmail.com>
Committed: Fri Oct 20 15:09:23 2017 +0200

----------------------------------------------------------------------
 .../job/JobVertexAccumulatorsHandler.java       | 22 ++++++-----
 .../rest/messages/JobIDPathParameter.java       |  4 +-
 .../messages/JobVertexAccumulatorsHeaders.java  | 12 ++++--
 .../messages/JobVertexAccumulatorsInfo.java     | 17 ++++----
 .../rest/messages/JobVertexIdPathParameter.java |  2 +-
 .../messages/JobVertexMessageParameters.java    | 41 ++++++++++++++++++++
 .../messages/JobVertexAccumulatorsInfoTest.java |  2 +-
 7 files changed, 74 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ebc3bc1f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
index e89052b..f7c9b49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
@@ -27,9 +27,9 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -37,37 +37,39 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
  * Request handler for the job vertex accumulators.
  */
-public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphHandler<JobVertexAccumulatorsInfo, JobMessageParameters> {
+public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphHandler<JobVertexAccumulatorsInfo, JobVertexMessageParameters> {
 
 	public JobVertexAccumulatorsHandler(
 			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
-			MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobMessageParameters> messageHeaders,
+			MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobVertexMessageParameters> messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor) {
 		super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
 	}
 
 	@Override
-	protected JobVertexAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+	protected JobVertexAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
 		JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
 		AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+
 		if (null != jobVertex) {
 			StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
-			List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>();
+			ArrayList<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
+
 			for (StringifiedAccumulatorResult acc : accs) {
-				userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
-					acc.getName(),
-					acc.getType(),
-					acc.getValue()));
+				userAccumulatorList.add(
+					new JobVertexAccumulatorsInfo.UserAccumulator(
+						acc.getName(),
+						acc.getType(),
+						acc.getValue()));
 			}
 
 			return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList);

http://git-wip-us.apache.org/repos/asf/flink/blob/ebc3bc1f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java
index a4ae0f2..fabcd4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java
@@ -25,10 +25,10 @@ import org.apache.flink.api.common.JobID;
  */
 public class JobIDPathParameter extends MessagePathParameter<JobID> {
 
-	private static final String JOB_ID = "jobid";
+	public static final String KEY = "jobid";
 
 	public JobIDPathParameter() {
-		super(JOB_ID);
+		super(KEY);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ebc3bc1f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
index 0e1179f..350dfc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
@@ -26,11 +26,15 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 /**
  * Message headers for the {@link JobVertexAccumulatorsHandler}.
  */
-public class JobVertexAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobMessageParameters> {
+public class JobVertexAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobVertexMessageParameters> {
 
 	private static final JobVertexAccumulatorsHeaders INSTANCE = new JobVertexAccumulatorsHeaders();
 
-	public static final String URL = "/jobs/:jobid/vertices/:vertexid/accumulators";
+	public static final String URL = "/jobs" +
+		"/:" + JobIDPathParameter.KEY +
+		"/vertices" +
+		"/:" + JobVertexIdPathParameter.KEY +
+		"/accumulators";
 
 	private JobVertexAccumulatorsHeaders() {}
 
@@ -50,8 +54,8 @@ public class JobVertexAccumulatorsHeaders implements MessageHeaders<EmptyRequest
 	}
 
 	@Override
-	public JobMessageParameters getUnresolvedMessageParameters() {
-		return new JobMessageParameters();
+	public JobVertexMessageParameters getUnresolvedMessageParameters() {
+		return new JobVertexMessageParameters();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ebc3bc1f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
index 3f64a06..80d1b4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
@@ -19,11 +19,12 @@
 package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.Objects;
 
 /**
@@ -38,14 +39,14 @@ public class JobVertexAccumulatorsInfo implements ResponseBody {
 	private String id;
 
 	@JsonProperty(FIELD_NAME_USER_ACCUMULATORS)
-	private List<UserAccumulator> userAccumulatorList;
+	private Collection<UserAccumulator> userAccumulatorList;
 
 	@JsonCreator
 	public JobVertexAccumulatorsInfo(
 			@JsonProperty(FIELD_NAME_ID) String id,
-			@JsonProperty(FIELD_NAME_USER_ACCUMULATORS) List<UserAccumulator> userAccumulatorList) {
-		this.id = id;
-		this.userAccumulatorList = userAccumulatorList;
+			@JsonProperty(FIELD_NAME_USER_ACCUMULATORS) Collection<UserAccumulator> userAccumulatorList) {
+		this.id = Preconditions.checkNotNull(id);
+		this.userAccumulatorList = Preconditions.checkNotNull(userAccumulatorList);
 	}
 
 	@Override
@@ -93,9 +94,9 @@ public class JobVertexAccumulatorsInfo implements ResponseBody {
 				@JsonProperty(FIELD_NAME_ACC_NAME) String name,
 				@JsonProperty(FIELD_NAME_ACC_TYPE) String type,
 				@JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
-			this.name = name;
-			this.type = type;
-			this.value = value;
+			this.name = Preconditions.checkNotNull(name);
+			this.type = Preconditions.checkNotNull(type);
+			this.value = Preconditions.checkNotNull(value);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ebc3bc1f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexIdPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexIdPathParameter.java
index 0e000f6..6f9a4ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexIdPathParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexIdPathParameter.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
  */
 public class JobVertexIdPathParameter extends MessagePathParameter<JobVertexID> {
 
-	private static final String KEY = "vertexid";
+	public static final String KEY = "vertexid";
 
 	public JobVertexIdPathParameter() {
 		super(KEY);

http://git-wip-us.apache.org/repos/asf/flink/blob/ebc3bc1f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexMessageParameters.java
new file mode 100644
index 0000000..042f6a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexMessageParameters.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Message parameters for job vertex REST handlers.
+ */
+public class JobVertexMessageParameters extends JobMessageParameters {
+
+	protected final JobVertexIdPathParameter jobVertexIdPathParameter = new JobVertexIdPathParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Arrays.asList(jobPathParameter, jobVertexIdPathParameter);
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.emptySet();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebc3bc1f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
index fd4fe78..21fbcef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
@@ -32,7 +32,7 @@ public class JobVertexAccumulatorsInfoTest extends RestResponseMarshallingTestBa
 
 	@Override
 	protected JobVertexAccumulatorsInfo getTestResponseInstance() throws Exception {
-		List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>();
+		List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>(3);
 		userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
 			"test name1",
 			"test type1",


[2/2] flink git commit: [FLINK-7855] [flip6] Port JobVertexAccumulatorsHandler to REST endpoint

Posted by tr...@apache.org.
[FLINK-7855] [flip6] Port JobVertexAccumulatorsHandler to REST endpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13488396
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13488396
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13488396

Branch: refs/heads/master
Commit: 13488396f73f157373c576fc23d5c3fb12592215
Parents: 142ff78
Author: zjureel <zj...@gmail.com>
Authored: Thu Oct 19 18:23:13 2017 +0800
Committer: Till <ti...@gmail.com>
Committed: Fri Oct 20 15:09:23 2017 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  11 ++
 .../job/JobVertexAccumulatorsHandler.java       |  78 ++++++++++++
 .../messages/JobVertexAccumulatorsHeaders.java  |  70 +++++++++++
 .../messages/JobVertexAccumulatorsInfo.java     | 120 +++++++++++++++++++
 .../messages/JobVertexAccumulatorsInfoTest.java |  51 ++++++++
 5 files changed, 330 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/13488396/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index ac4897b..447cc0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
+import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
@@ -57,6 +58,7 @@ import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -217,6 +219,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			executionGraphCache,
 			executor);
 
+		JobVertexAccumulatorsHandler jobVertexAccumulatorsHandler = new JobVertexAccumulatorsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			JobVertexAccumulatorsHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -244,6 +254,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler));
 		handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
 		handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
+		handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
 
 		BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
 		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/13488396/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
new file mode 100644
index 0000000..e89052b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex accumulators.
+ */
+public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphHandler<JobVertexAccumulatorsInfo, JobMessageParameters> {
+
+	public JobVertexAccumulatorsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+		super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
+	}
+
+	@Override
+	protected JobVertexAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+		JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+		AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+		if (null != jobVertex) {
+			StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
+			List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>();
+			for (StringifiedAccumulatorResult acc : accs) {
+				userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+					acc.getName(),
+					acc.getType(),
+					acc.getValue()));
+			}
+
+			return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList);
+		} else {
+			throw new RestHandlerException("There is no accumulator for vertex " + jobVertexID + '.', HttpResponseStatus.NOT_FOUND);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/13488396/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
new file mode 100644
index 0000000..0e1179f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobVertexAccumulatorsHandler}.
+ */
+public class JobVertexAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobMessageParameters> {
+
+	private static final JobVertexAccumulatorsHeaders INSTANCE = new JobVertexAccumulatorsHeaders();
+
+	public static final String URL = "/jobs/:jobid/vertices/:vertexid/accumulators";
+
+	private JobVertexAccumulatorsHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<JobVertexAccumulatorsInfo> getResponseClass() {
+		return JobVertexAccumulatorsInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static JobVertexAccumulatorsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/13488396/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
new file mode 100644
index 0000000..3f64a06
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexAccumulatorsHandler}.
+ */
+public class JobVertexAccumulatorsInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_ID = "id";
+	public static final String FIELD_NAME_USER_ACCUMULATORS = "user-accumulators";
+
+	@JsonProperty(FIELD_NAME_ID)
+	private String id;
+
+	@JsonProperty(FIELD_NAME_USER_ACCUMULATORS)
+	private List<UserAccumulator> userAccumulatorList;
+
+	@JsonCreator
+	public JobVertexAccumulatorsInfo(
+			@JsonProperty(FIELD_NAME_ID) String id,
+			@JsonProperty(FIELD_NAME_USER_ACCUMULATORS) List<UserAccumulator> userAccumulatorList) {
+		this.id = id;
+		this.userAccumulatorList = userAccumulatorList;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JobVertexAccumulatorsInfo that = (JobVertexAccumulatorsInfo) o;
+		return Objects.equals(id, that.id) &&
+			Objects.equals(userAccumulatorList, that.userAccumulatorList);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(id, userAccumulatorList);
+	}
+
+	//---------------------------------------------------------------------------------
+	// Static helper classes
+	//---------------------------------------------------------------------------------
+
+	/**
+	 * Json serializer for the {@link JobVertexAccumulatorsInfo}.
+	 */
+	public static final class UserAccumulator {
+
+		public static final String FIELD_NAME_ACC_NAME = "name";
+		public static final String FIELD_NAME_ACC_TYPE = "type";
+		public static final String FIELD_NAME_ACC_VALUE = "value";
+
+		@JsonProperty(FIELD_NAME_ACC_NAME)
+		private String name;
+
+		@JsonProperty(FIELD_NAME_ACC_TYPE)
+		private String type;
+
+		@JsonProperty(FIELD_NAME_ACC_VALUE)
+		private String value;
+
+		@JsonCreator
+		public UserAccumulator(
+				@JsonProperty(FIELD_NAME_ACC_NAME) String name,
+				@JsonProperty(FIELD_NAME_ACC_TYPE) String type,
+				@JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
+			this.name = name;
+			this.type = type;
+			this.value = value;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			UserAccumulator that = (UserAccumulator) o;
+			return Objects.equals(name, that.name) &&
+				Objects.equals(type, that.type) &&
+				Objects.equals(value, that.value);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(name, type, value);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/13488396/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
new file mode 100644
index 0000000..fd4fe78
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests that the {@link JobVertexAccumulatorsInfo} can be marshalled and unmarshalled.
+ */
+public class JobVertexAccumulatorsInfoTest extends RestResponseMarshallingTestBase<JobVertexAccumulatorsInfo> {
+	@Override
+	protected Class<JobVertexAccumulatorsInfo> getTestResponseClass() {
+		return JobVertexAccumulatorsInfo.class;
+	}
+
+	@Override
+	protected JobVertexAccumulatorsInfo getTestResponseInstance() throws Exception {
+		List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>();
+		userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+			"test name1",
+			"test type1",
+			"test value1"));
+		userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+			"test name2",
+			"test type2",
+			"test value2"));
+		userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+			"test name3",
+			"test type3",
+			"test value3"));
+
+		return new JobVertexAccumulatorsInfo("testId", userAccumulatorList);
+	}
+}