You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/20 13:10:11 UTC
[2/2] flink git commit: [FLINK-7855] [flip6] Port
JobVertexAccumulatorsHandler to REST endpoint
[FLINK-7855] [flip6] Port JobVertexAccumulatorsHandler to REST endpoint
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13488396
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13488396
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13488396
Branch: refs/heads/master
Commit: 13488396f73f157373c576fc23d5c3fb12592215
Parents: 142ff78
Author: zjureel <zj...@gmail.com>
Authored: Thu Oct 19 18:23:13 2017 +0800
Committer: Till <ti...@gmail.com>
Committed: Fri Oct 20 15:09:23 2017 +0200
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 11 ++
.../job/JobVertexAccumulatorsHandler.java | 78 ++++++++++++
.../messages/JobVertexAccumulatorsHeaders.java | 70 +++++++++++
.../messages/JobVertexAccumulatorsInfo.java | 120 +++++++++++++++++++
.../messages/JobVertexAccumulatorsInfoTest.java | 51 ++++++++
5 files changed, 330 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/13488396/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index ac4897b..447cc0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
+import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
@@ -57,6 +58,7 @@ import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -217,6 +219,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executionGraphCache,
executor);
+ JobVertexAccumulatorsHandler jobVertexAccumulatorsHandler = new JobVertexAccumulatorsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ JobVertexAccumulatorsHeaders.getInstance(),
+ executionGraphCache,
+ executor);
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -244,6 +254,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler));
handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
+ handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/13488396/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
new file mode 100644
index 0000000..e89052b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex accumulators.
+ */
+public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphHandler<JobVertexAccumulatorsInfo, JobMessageParameters> {
+
+ public JobVertexAccumulatorsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor) {
+ super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
+ }
+
+ @Override
+ protected JobVertexAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+ JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+ AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+ if (null != jobVertex) {
+ StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
+ List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>();
+ for (StringifiedAccumulatorResult acc : accs) {
+ userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+ acc.getName(),
+ acc.getType(),
+ acc.getValue()));
+ }
+
+ return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList);
+ } else {
+ throw new RestHandlerException("There is no accumulator for vertex " + jobVertexID + '.', HttpResponseStatus.NOT_FOUND);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/13488396/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
new file mode 100644
index 0000000..0e1179f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobVertexAccumulatorsHandler}.
+ */
+public class JobVertexAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobMessageParameters> {
+
+ private static final JobVertexAccumulatorsHeaders INSTANCE = new JobVertexAccumulatorsHeaders();
+
+ public static final String URL = "/jobs/:jobid/vertices/:vertexid/accumulators";
+
+ private JobVertexAccumulatorsHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<JobVertexAccumulatorsInfo> getResponseClass() {
+ return JobVertexAccumulatorsInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static JobVertexAccumulatorsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/13488396/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
new file mode 100644
index 0000000..3f64a06
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfo.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexAccumulatorsHandler}.
+ */
+public class JobVertexAccumulatorsInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_ID = "id";
+ public static final String FIELD_NAME_USER_ACCUMULATORS = "user-accumulators";
+
+ @JsonProperty(FIELD_NAME_ID)
+ private String id;
+
+ @JsonProperty(FIELD_NAME_USER_ACCUMULATORS)
+ private List<UserAccumulator> userAccumulatorList;
+
+ @JsonCreator
+ public JobVertexAccumulatorsInfo(
+ @JsonProperty(FIELD_NAME_ID) String id,
+ @JsonProperty(FIELD_NAME_USER_ACCUMULATORS) List<UserAccumulator> userAccumulatorList) {
+ this.id = id;
+ this.userAccumulatorList = userAccumulatorList;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobVertexAccumulatorsInfo that = (JobVertexAccumulatorsInfo) o;
+ return Objects.equals(id, that.id) &&
+ Objects.equals(userAccumulatorList, that.userAccumulatorList);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, userAccumulatorList);
+ }
+
+ //---------------------------------------------------------------------------------
+ // Static helper classes
+ //---------------------------------------------------------------------------------
+
+ /**
+ * Json serializer for the {@link JobVertexAccumulatorsInfo}.
+ */
+ public static final class UserAccumulator {
+
+ public static final String FIELD_NAME_ACC_NAME = "name";
+ public static final String FIELD_NAME_ACC_TYPE = "type";
+ public static final String FIELD_NAME_ACC_VALUE = "value";
+
+ @JsonProperty(FIELD_NAME_ACC_NAME)
+ private String name;
+
+ @JsonProperty(FIELD_NAME_ACC_TYPE)
+ private String type;
+
+ @JsonProperty(FIELD_NAME_ACC_VALUE)
+ private String value;
+
+ @JsonCreator
+ public UserAccumulator(
+ @JsonProperty(FIELD_NAME_ACC_NAME) String name,
+ @JsonProperty(FIELD_NAME_ACC_TYPE) String type,
+ @JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
+ this.name = name;
+ this.type = type;
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UserAccumulator that = (UserAccumulator) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(type, that.type) &&
+ Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/13488396/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
new file mode 100644
index 0000000..fd4fe78
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsInfoTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests that the {@link JobVertexAccumulatorsInfo} can be marshalled and unmarshalled.
+ */
+public class JobVertexAccumulatorsInfoTest extends RestResponseMarshallingTestBase<JobVertexAccumulatorsInfo> {
+ @Override
+ protected Class<JobVertexAccumulatorsInfo> getTestResponseClass() {
+ return JobVertexAccumulatorsInfo.class;
+ }
+
+ @Override
+ protected JobVertexAccumulatorsInfo getTestResponseInstance() throws Exception {
+ List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>();
+ userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+ "test name1",
+ "test type1",
+ "test value1"));
+ userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+ "test name2",
+ "test type2",
+ "test value2"));
+ userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
+ "test name3",
+ "test type3",
+ "test value3"));
+
+ return new JobVertexAccumulatorsInfo("testId", userAccumulatorList);
+ }
+}