You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:16 UTC
[18/30] flink git commit: [FLINK-7706] [flip6] Add
JobAccumulatorsHandler for new REST endpoint
[FLINK-7706] [flip6] Add JobAccumulatorsHandler for new REST endpoint
This closes #4898.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c62c527
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c62c527
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c62c527
Branch: refs/heads/master
Commit: 0c62c5278fc5581ea3485977a562df4445cf9dc9
Parents: de201a6
Author: yew1eb <ye...@gmail.com>
Authored: Wed Oct 25 00:55:01 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:44 2017 +0100
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 12 ++
.../handler/job/JobAccumulatorsHandler.java | 79 ++++++++++++
.../rest/messages/JobAccumulatorsHeaders.java | 73 +++++++++++
.../rest/messages/JobAccumulatorsInfo.java | 126 +++++++++++++++++++
.../rest/messages/JobAccumulatorsInfoTest.java | 52 ++++++++
5 files changed, 342 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0c62c527/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index e13fd5b..8244d95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
@@ -59,6 +60,7 @@ import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
@@ -302,6 +304,15 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executor,
metricFetcher);
+ JobAccumulatorsHandler jobAccumulatorsHandler = new JobAccumulatorsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobAccumulatorsHeaders.getInstance(),
+ executionGraphCache,
+ executor);
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -331,6 +342,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
+ handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), jobAccumulatorsHandler));
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/0c62c527/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
new file mode 100644
index 0000000..7dd5ff0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the aggregated accumulators of a job.
+ */
+public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobMessageParameters> {
+
+ public JobAccumulatorsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor) {
+ super(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ executionGraphCache,
+ executor);
+ }
+
+ @Override
+ protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph graph) throws RestHandlerException {
+ StringifiedAccumulatorResult[] accs = graph.getAccumulatorResultsStringified();
+ List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(accs.length);
+
+ for (StringifiedAccumulatorResult acc : accs) {
+ userTaskAccumulators.add(
+ new JobAccumulatorsInfo.UserTaskAccumulator(
+ acc.getName(),
+ acc.getType(),
+ acc.getValue()));
+ }
+
+ return new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c62c527/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
new file mode 100644
index 0000000..00f4fd5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobMessageParameters> {
+
+ private static final JobAccumulatorsHeaders INSTANCE = new JobAccumulatorsHeaders();
+
+ public static final String URL = "/jobs" +
+ "/:" + JobIDPathParameter.KEY +
+ "/accumulators";
+
+ private JobAccumulatorsHeaders() {
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<JobAccumulatorsInfo> getResponseClass() {
+ return JobAccumulatorsInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static JobAccumulatorsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c62c527/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
new file mode 100644
index 0000000..367a38b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsInfo implements ResponseBody {
+ public static final String FIELD_NAME_JOB_ACCUMULATORS = "job-accumulators";
+ public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = "user-task-accumulators";
+
+ @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
+ private List<JobAccumulator> jobAccumulators;
+
+ @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
+ private List<UserTaskAccumulator> userAccumulators;
+
+ @JsonCreator
+ public JobAccumulatorsInfo(
+ @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators,
+ @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators) {
+ this.jobAccumulators = Preconditions.checkNotNull(jobAccumulators);
+ this.userAccumulators = Preconditions.checkNotNull(userAccumulators);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobAccumulatorsInfo that = (JobAccumulatorsInfo) o;
+ return Objects.equals(userAccumulators, that.userAccumulators);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(userAccumulators);
+ }
+
+ //---------------------------------------------------------------------------------
+ // Static helper classes
+ //---------------------------------------------------------------------------------
+
+ /**
+ * Json serializer for the {@link JobAccumulatorsInfo}.
+ */
+ public static final class JobAccumulator {
+ // empty for now
+ }
+
+ /**
+ * Json serializer for the {@link JobAccumulatorsInfo}.
+ */
+ public static final class UserTaskAccumulator {
+
+ public static final String FIELD_NAME_ACC_NAME = "name";
+ public static final String FIELD_NAME_ACC_TYPE = "type";
+ public static final String FIELD_NAME_ACC_VALUE = "value";
+
+ @JsonProperty(FIELD_NAME_ACC_NAME)
+ private String name;
+
+ @JsonProperty(FIELD_NAME_ACC_TYPE)
+ private String type;
+
+ @JsonProperty(FIELD_NAME_ACC_VALUE)
+ private String value;
+
+ @JsonCreator
+ public UserTaskAccumulator(
+ @JsonProperty(FIELD_NAME_ACC_NAME) String name,
+ @JsonProperty(FIELD_NAME_ACC_TYPE) String type,
+ @JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
+ this.name = Preconditions.checkNotNull(name);
+ this.type = Preconditions.checkNotNull(type);
+ this.value = Preconditions.checkNotNull(value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UserTaskAccumulator that = (UserTaskAccumulator) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(type, that.type) &&
+ Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c62c527/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
new file mode 100644
index 0000000..baaa551
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests that the {@link JobAccumulatorsInfo} can be marshalled and unmarshalled.
+ */
+public class JobAccumulatorsInfoTest extends RestResponseMarshallingTestBase<JobAccumulatorsInfo> {
+ @Override
+ protected Class<JobAccumulatorsInfo> getTestResponseClass() {
+ return JobAccumulatorsInfo.class;
+ }
+
+ @Override
+ protected JobAccumulatorsInfo getTestResponseInstance() throws Exception {
+ List<JobAccumulatorsInfo.UserTaskAccumulator> userAccumulatorList = new ArrayList<>(3);
+ userAccumulatorList.add(new JobAccumulatorsInfo.UserTaskAccumulator(
+ "uta1.name",
+ "uta1.type",
+ "uta1.value"));
+ userAccumulatorList.add(new JobAccumulatorsInfo.UserTaskAccumulator(
+ "uta2.name",
+ "uta2.type",
+ "uta2.value"));
+ userAccumulatorList.add(new JobAccumulatorsInfo.UserTaskAccumulator(
+ "uta3.name",
+ "uta3.type",
+ "uta3.value"));
+
+ return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList);
+ }
+}