You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:16 UTC

[18/30] flink git commit: [FLINK-7706] [flip6] Add JobAccumulatorsHandler for new REST endpoint

[FLINK-7706] [flip6] Add JobAccumulatorsHandler for new REST endpoint

This closes #4898.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c62c527
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c62c527
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c62c527

Branch: refs/heads/master
Commit: 0c62c5278fc5581ea3485977a562df4445cf9dc9
Parents: de201a6
Author: yew1eb <ye...@gmail.com>
Authored: Wed Oct 25 00:55:01 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:44 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  12 ++
 .../handler/job/JobAccumulatorsHandler.java     |  79 ++++++++++++
 .../rest/messages/JobAccumulatorsHeaders.java   |  73 +++++++++++
 .../rest/messages/JobAccumulatorsInfo.java      | 126 +++++++++++++++++++
 .../rest/messages/JobAccumulatorsInfoTest.java  |  52 ++++++++
 5 files changed, 342 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c62c527/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index e13fd5b..8244d95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
@@ -59,6 +60,7 @@ import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
@@ -302,6 +304,15 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			executor,
 			metricFetcher);
 
+		JobAccumulatorsHandler jobAccumulatorsHandler = new JobAccumulatorsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobAccumulatorsHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -331,6 +342,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
 		handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
 		handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
+		handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), jobAccumulatorsHandler));
 		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
 		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
 		handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/0c62c527/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
new file mode 100644
index 0000000..7dd5ff0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the aggregated accumulators of a job.
+ */
+public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobMessageParameters> {
+
+	public JobAccumulatorsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			messageHeaders,
+			executionGraphCache,
+			executor);
+	}
+
+	@Override
+	protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph graph) throws RestHandlerException {
+		StringifiedAccumulatorResult[] accs = graph.getAccumulatorResultsStringified();
+		List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(accs.length);
+
+		for (StringifiedAccumulatorResult acc : accs) {
+			userTaskAccumulators.add(
+				new JobAccumulatorsInfo.UserTaskAccumulator(
+					acc.getName(),
+					acc.getType(),
+					acc.getValue()));
+		}
+
+		return new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c62c527/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
new file mode 100644
index 0000000..00f4fd5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobMessageParameters> {
+
+	private static final JobAccumulatorsHeaders INSTANCE = new JobAccumulatorsHeaders();
+
+	public static final String URL = "/jobs" +
+		"/:" + JobIDPathParameter.KEY +
+		"/accumulators";
+
+	private JobAccumulatorsHeaders() {
+	}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<JobAccumulatorsInfo> getResponseClass() {
+		return JobAccumulatorsInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static JobAccumulatorsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c62c527/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
new file mode 100644
index 0000000..367a38b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsInfo implements ResponseBody {
+	public static final String FIELD_NAME_JOB_ACCUMULATORS = "job-accumulators";
+	public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = "user-task-accumulators";
+
+	@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
+	private List<JobAccumulator> jobAccumulators;
+
+	@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
+	private List<UserTaskAccumulator> userAccumulators;
+
+	@JsonCreator
+	public JobAccumulatorsInfo(
+			@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators,
+			@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators) {
+		this.jobAccumulators = Preconditions.checkNotNull(jobAccumulators);
+		this.userAccumulators = Preconditions.checkNotNull(userAccumulators);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JobAccumulatorsInfo that = (JobAccumulatorsInfo) o;
+		return Objects.equals(userAccumulators, that.userAccumulators);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(userAccumulators);
+	}
+
+	//---------------------------------------------------------------------------------
+	// Static helper classes
+	//---------------------------------------------------------------------------------
+
+	/**
+	 * Json serializer for the {@link JobAccumulatorsInfo}.
+	 */
+	public static final class JobAccumulator {
+		// empty for now
+	}
+
+	/**
+	 * Json serializer for the {@link JobAccumulatorsInfo}.
+	 */
+	public static final class UserTaskAccumulator {
+
+		public static final String FIELD_NAME_ACC_NAME = "name";
+		public static final String FIELD_NAME_ACC_TYPE = "type";
+		public static final String FIELD_NAME_ACC_VALUE = "value";
+
+		@JsonProperty(FIELD_NAME_ACC_NAME)
+		private String name;
+
+		@JsonProperty(FIELD_NAME_ACC_TYPE)
+		private String type;
+
+		@JsonProperty(FIELD_NAME_ACC_VALUE)
+		private String value;
+
+		@JsonCreator
+		public UserTaskAccumulator(
+			@JsonProperty(FIELD_NAME_ACC_NAME) String name,
+			@JsonProperty(FIELD_NAME_ACC_TYPE) String type,
+			@JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
+			this.name = Preconditions.checkNotNull(name);
+			this.type = Preconditions.checkNotNull(type);
+			this.value = Preconditions.checkNotNull(value);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			UserTaskAccumulator that = (UserTaskAccumulator) o;
+			return Objects.equals(name, that.name) &&
+				Objects.equals(type, that.type) &&
+				Objects.equals(value, that.value);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(name, type, value);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c62c527/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
new file mode 100644
index 0000000..baaa551
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests that the {@link JobAccumulatorsInfo} can be marshalled and unmarshalled.
+ */
+public class JobAccumulatorsInfoTest extends RestResponseMarshallingTestBase<JobAccumulatorsInfo> {
+	@Override
+	protected Class<JobAccumulatorsInfo> getTestResponseClass() {
+		return JobAccumulatorsInfo.class;
+	}
+
+	@Override
+	protected JobAccumulatorsInfo getTestResponseInstance() throws Exception {
+		List<JobAccumulatorsInfo.UserTaskAccumulator> userAccumulatorList = new ArrayList<>(3);
+		userAccumulatorList.add(new JobAccumulatorsInfo.UserTaskAccumulator(
+			"uta1.name",
+			"uta1.type",
+			"uta1.value"));
+		userAccumulatorList.add(new JobAccumulatorsInfo.UserTaskAccumulator(
+			"uta2.name",
+			"uta2.type",
+			"uta2.value"));
+		userAccumulatorList.add(new JobAccumulatorsInfo.UserTaskAccumulator(
+			"uta3.name",
+			"uta3.type",
+			"uta3.value"));
+
+		return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList);
+	}
+}