You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/29 17:04:04 UTC
[2/2] flink git commit: [FLINK-7652] [flip6] Port
CurrentJobIdsHandler to new REST endpoint
[FLINK-7652] [flip6] Port CurrentJobIdsHandler to new REST endpoint
This closes #4734.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67aad88e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67aad88e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67aad88e
Branch: refs/heads/master
Commit: 67aad88ee025ce02053ab560f2504762f53b87d9
Parents: 731896a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Sep 27 13:02:32 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 18:03:41 2017 +0100
----------------------------------------------------------------------
.../runtime/akka/AkkaJobManagerGateway.java | 6 +-
.../runtime/dispatcher/DispatcherGateway.java | 9 ++
.../dispatcher/DispatcherRestEndpoint.java | 10 ++
.../runtime/jobmaster/JobManagerGateway.java | 4 +-
.../flink/runtime/jobmaster/JobMaster.java | 3 +-
.../webmonitor/JobIdsWithStatusesOverview.java | 152 +++++++++++++++++++
.../webmonitor/JobsWithIDsOverview.java | 120 ---------------
.../runtime/rest/handler/job/JobIdsHandler.java | 72 +++++++++
.../handler/legacy/CurrentJobIdsHandler.java | 45 +++---
.../JobIdsWithStatusesOverviewHeaders.java | 70 +++++++++
.../flink/runtime/jobmanager/JobManager.scala | 27 ++--
.../runtime/jobmanager/MemoryArchivist.scala | 19 +--
.../messages/WebMonitorMessagesTest.java | 11 +-
.../JobIdsWithStatusesOverviewTest.java | 47 ++++++
14 files changed, 410 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 57b5dec..f7f8898 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
@@ -283,11 +283,11 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
}
@Override
- public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout) {
+ public CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
- .mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class)));
+ .mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusesOverview.class)));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 12cbbfb..51e9112 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -79,4 +80,12 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
* @return A future integer of the blob server port
*/
CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
+
+ /**
+ * Request details of submitted jobs.
+ *
+ * @param timeout RPC timeout
+ * @return A future of the details of submitted jobs.
+ */
+ CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index a99151a..4556bb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
@@ -170,6 +172,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
responseHeaders,
JobsOverviewHeaders.getInstance());
+ JobIdsHandler<DispatcherGateway> jobIdsHandler = new JobIdsHandler<>(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobIdsWithStatusesOverviewHeaders.getInstance());
+
ClusterConfigHandler<DispatcherGateway> clusterConfigurationHandler = new ClusterConfigHandler<>(
restAddressFuture,
leaderRetriever,
@@ -370,6 +379,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigHandler));
handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
+ handlers.add(Tuple2.of(JobIdsWithStatusesOverviewHeaders.getInstance(), jobIdsHandler));
handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index 2527e46..84f42fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import javax.annotation.Nullable;
@@ -127,5 +127,5 @@ public interface JobManagerGateway extends RestfulGateway {
* @param timeout for the asynchronous operation
* @return Future containing the job overview
*/
- CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout);
+ CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 4535290..e2fb65f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -742,7 +743,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
}
@Override
- public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+ public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout) {
return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
new file mode 100644
index 0000000..59c1e2c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An overview of how many jobs are in which status.
+ */
+public class JobIdsWithStatusesOverview implements ResponseBody, InfoMessage {
+
+ private static final long serialVersionUID = -3699051943490133183L;
+
+ public static final String FIELD_NAME_JOB_IDS = "job-ids";
+
+ @JsonProperty(FIELD_NAME_JOB_IDS)
+ @JsonSerialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusSerializer.class)
+ private final Collection<Tuple2<JobID, JobStatus>> jobIds;
+
+ @JsonCreator
+ public JobIdsWithStatusesOverview(
+ @JsonProperty(FIELD_NAME_JOB_IDS) @JsonDeserialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusDeserializer.class) Collection<Tuple2<JobID, JobStatus>> jobIds) {
+ this.jobIds = checkNotNull(jobIds);
+ }
+
+ public JobIdsWithStatusesOverview(JobIdsWithStatusesOverview first, JobIdsWithStatusesOverview second) {
+ this.jobIds = combine(first.getJobIds(), second.getJobIds());
+ }
+
+ public Collection<Tuple2<JobID, JobStatus>> getJobIds() {
+ return jobIds;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return jobIds.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ else if (obj instanceof JobIdsWithStatusesOverview) {
+ JobIdsWithStatusesOverview that = (JobIdsWithStatusesOverview) obj;
+ return jobIds.equals(that.getJobIds());
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JobIdsWithStatusesOverview { " + jobIds + " }";
+ }
+
+ public static final class JobIdWithStatusSerializer extends StdSerializer<Tuple2<JobID, JobStatus>> {
+
+ private static final long serialVersionUID = 2196011372021674535L;
+
+ public JobIdWithStatusSerializer() {
+ super(Tuple2.class, true);
+ }
+
+ @Override
+ public void serialize(
+ Tuple2<JobID, JobStatus> jobIdWithStatus,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException {
+
+ jsonGenerator.writeStartArray();
+
+ jsonGenerator.writeString(jobIdWithStatus.f0.toString());
+ jsonGenerator.writeString(jobIdWithStatus.f1.name());
+
+ jsonGenerator.writeEndArray();
+ }
+ }
+
+ public static final class JobIdWithStatusDeserializer extends StdDeserializer<Tuple2<JobID, JobStatus>> {
+
+ private static final long serialVersionUID = 5378670283978134794L;
+
+ public JobIdWithStatusDeserializer() {
+ super(Tuple2.class);
+ }
+
+ @Override
+ public Tuple2<JobID, JobStatus> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+ Tuple2<JobID, JobStatus> deserializedIdWithStatus = Tuple2.of(
+ JobID.fromHexString(jsonParser.nextTextValue()), JobStatus.valueOf(jsonParser.nextTextValue()));
+
+ // read the END_ARRAY token, otherwise it will be mistaken as the end of the whole collection of ids and statuses
+ jsonParser.nextValue();
+
+ return deserializedIdWithStatus;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static Collection<Tuple2<JobID, JobStatus>> combine(
+ Collection<Tuple2<JobID, JobStatus>> first,
+ Collection<Tuple2<JobID, JobStatus>> second) {
+
+ checkNotNull(first);
+ checkNotNull(second);
+ ArrayList<Tuple2<JobID, JobStatus>> result = new ArrayList<>(first.size() + second.size());
+ result.addAll(first);
+ result.addAll(second);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
deleted file mode 100644
index 4cb9d8b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An overview of how many jobs are in which status.
- */
-public class JobsWithIDsOverview implements InfoMessage {
-
- private static final long serialVersionUID = -3699051943490133183L;
-
- private final List<JobID> jobsRunningOrPending;
- private final List<JobID> jobsFinished;
- private final List<JobID> jobsCancelled;
- private final List<JobID> jobsFailed;
-
- public JobsWithIDsOverview(List<JobID> jobsRunningOrPending, List<JobID> jobsFinished,
- List<JobID> jobsCancelled, List<JobID> jobsFailed) {
-
- this.jobsRunningOrPending = checkNotNull(jobsRunningOrPending);
- this.jobsFinished = checkNotNull(jobsFinished);
- this.jobsCancelled = checkNotNull(jobsCancelled);
- this.jobsFailed = checkNotNull(jobsFailed);
- }
-
- public JobsWithIDsOverview(JobsWithIDsOverview first, JobsWithIDsOverview second) {
- this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
- this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
- this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
- this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
- }
-
- public List<JobID> getJobsRunningOrPending() {
- return jobsRunningOrPending;
- }
-
- public List<JobID> getJobsFinished() {
- return jobsFinished;
- }
-
- public List<JobID> getJobsCancelled() {
- return jobsCancelled;
- }
-
- public List<JobID> getJobsFailed() {
- return jobsFailed;
- }
-
- // ------------------------------------------------------------------------
-
-
- @Override
- public int hashCode() {
- return jobsRunningOrPending.hashCode() ^
- jobsFinished.hashCode() ^
- jobsCancelled.hashCode() ^
- jobsFailed.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- else if (obj instanceof JobsWithIDsOverview) {
- JobsWithIDsOverview that = (JobsWithIDsOverview) obj;
- return this.jobsRunningOrPending.equals(that.jobsRunningOrPending) &&
- this.jobsFinished.equals(that.jobsFinished) &&
- this.jobsCancelled.equals(that.jobsCancelled) &&
- this.jobsFailed.equals(that.jobsFailed);
- }
- else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return "JobsOverview {" +
- "numJobsRunningOrPending=" + jobsRunningOrPending +
- ", numJobsFinished=" + jobsFinished +
- ", numJobsCancelled=" + jobsCancelled +
- ", numJobsFailed=" + jobsFailed +
- '}';
- }
-
- // ------------------------------------------------------------------------
-
- private static ArrayList<JobID> combine(List<JobID> first, List<JobID> second) {
- checkNotNull(first);
- checkNotNull(second);
- ArrayList<JobID> result = new ArrayList<JobID>(first.size() + second.size());
- result.addAll(first);
- result.addAll(second);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
new file mode 100644
index 0000000..9eea83c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for job IDs.
+ *
+ * @param <T> type of the leader gateway
+ */
+public class JobIdsHandler<T extends RestfulGateway>
+ extends AbstractRestHandler<T, EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+
+ public JobIdsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends T> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> messageHeaders) {
+ super(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders);
+ }
+
+ @Override
+ protected CompletableFuture<JobIdsWithStatusesOverview> handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
+ @Nonnull T gateway) throws RestHandlerException {
+
+ return gateway.requestJobDetails(timeout).thenApply(
+ multipleJobDetails -> new JobIdsWithStatusesOverview(
+ multipleJobDetails.getJobs().stream().map(jobDetails -> Tuple2.of(jobDetails.getJobId(), jobDetails.getStatus())).collect(Collectors.toList())
+ )
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
index cbeb957..87f54fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -42,8 +45,6 @@ import static java.util.Objects.requireNonNull;
*/
public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
- private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
-
private final Time timeout;
public CurrentJobIdsHandler(Executor executor, Time timeout) {
@@ -53,51 +54,39 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
@Override
public String[] getPaths() {
- return new String[]{CURRENT_JOB_IDS_REST_PATH};
+ return new String[]{JobIdsWithStatusesOverviewHeaders.CURRENT_JOB_IDS_REST_PATH};
}
@Override
- public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ public CompletableFuture<String> handleJsonRequest(
+ Map<String, String> pathParams,
+ Map<String, String> queryParams,
+ JobManagerGateway jobManagerGateway) {
+
return CompletableFuture.supplyAsync(
() -> {
// we need no parameters, get all requests
try {
if (jobManagerGateway != null) {
- CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
- JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ CompletableFuture<JobIdsWithStatusesOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+ JobIdsWithStatusesOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
+ gen.writeArrayFieldStart(JobIdsWithStatusesOverview.FIELD_NAME_JOB_IDS);
- gen.writeArrayFieldStart("jobs-running");
- for (JobID jid : overview.getJobsRunningOrPending()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-finished");
- for (JobID jid : overview.getJobsFinished()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-cancelled");
- for (JobID jid : overview.getJobsCancelled()) {
- gen.writeString(jid.toString());
+ final JobIdsWithStatusesOverview.JobIdWithStatusSerializer jobIdWithStatusSerializer = new JobIdsWithStatusesOverview.JobIdWithStatusSerializer();
+ for (Tuple2<JobID, JobStatus> jobIdWithStatus : overview.getJobIds()) {
+ jobIdWithStatusSerializer.serialize(jobIdWithStatus, gen, null);
}
- gen.writeEndArray();
- gen.writeArrayFieldStart("jobs-failed");
- for (JobID jid : overview.getJobsFailed()) {
- gen.writeString(jid.toString());
- }
gen.writeEndArray();
-
gen.writeEndObject();
gen.close();
+
return writer.toString();
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
new file mode 100644
index 0000000..4dee21a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobIdsWithStatusesOverview}.
+ */
+public class JobIdsWithStatusesOverviewHeaders implements MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+
+ public static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
+
+ private static final JobIdsWithStatusesOverviewHeaders INSTANCE = new JobIdsWithStatusesOverviewHeaders();
+
+ private JobIdsWithStatusesOverviewHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return CURRENT_JOB_IDS_REST_PATH;
+ }
+
+ @Override
+ public Class<JobIdsWithStatusesOverview> getResponseClass() {
+ return JobIdsWithStatusesOverview.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public EmptyMessageParameters getUnresolvedMessageParameters() {
+ return EmptyMessageParameters.getInstance();
+ }
+
+ public static JobIdsWithStatusesOverviewHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c12db23..2aeaa6e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1627,8 +1627,8 @@ class JobManager(
val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
future.onSuccess {
- case archiveOverview: JobsWithIDsOverview =>
- theSender ! new JobsWithIDsOverview(ourJobs, archiveOverview)
+ case archiveOverview: JobIdsWithStatusesOverview =>
+ theSender ! new JobIdsWithStatusesOverview(ourJobs, archiveOverview)
}(context.dispatcher)
case _ : RequestStatusOverview =>
@@ -1694,22 +1694,17 @@ class JobManager(
new JobsOverview(runningOrPending, finished, canceled, failed)
}
- private def createJobStatusWithIDsOverview() : JobsWithIDsOverview = {
- val runningOrPending = new java.util.ArrayList[JobID]()
- val finished = new java.util.ArrayList[JobID]()
- val canceled = new java.util.ArrayList[JobID]()
- val failed = new java.util.ArrayList[JobID]()
-
- currentJobs.values.foreach { case (graph, _) =>
- graph.getState() match {
- case JobStatus.FINISHED => finished.add(graph.getJobID)
- case JobStatus.CANCELED => canceled.add(graph.getJobID)
- case JobStatus.FAILED => failed.add(graph.getJobID)
- case _ => runningOrPending.add(graph.getJobID)
- }
+ private def createJobStatusWithIDsOverview() : JobIdsWithStatusesOverview = {
+ val jobIdsWithStatuses =
+ new java.util.ArrayList[
+ org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](currentJobs.size)
+
+ currentJobs.values.foreach { job =>
+ jobIdsWithStatuses.add(
+ org.apache.flink.api.java.tuple.Tuple2.of(job._1.getJobID, job._1.getState))
}
- new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+ new JobIdsWithStatusesOverview(jobIdsWithStatuses)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 3885596..2efdfb2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -222,22 +222,17 @@ class MemoryArchivist(
new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
}
- private def createJobsWithIDsOverview() : JobsWithIDsOverview = {
- val runningOrPending = new util.ArrayList[JobID]()
- val finished = new util.ArrayList[JobID]()
- val canceled = new util.ArrayList[JobID]()
- val failed = new util.ArrayList[JobID]()
+ private def createJobsWithIDsOverview() : JobIdsWithStatusesOverview = {
+ val jobIdsWithStatuses =
+ new java.util.ArrayList[
+ org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](graphs.size)
graphs.values.foreach { graph =>
- graph.getState() match {
- case JobStatus.FINISHED => finished.add(graph.getJobID)
- case JobStatus.CANCELED => canceled.add(graph.getJobID)
- case JobStatus.FAILED => failed.add(graph.getJobID)
- case _ => runningOrPending.add(graph.getJobID)
- }
+ jobIdsWithStatuses.add(
+ org.apache.flink.api.java.tuple.Tuple2.of(graph.getJobID, graph.getState))
}
- new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+ new JobIdsWithStatusesOverview(jobIdsWithStatuses)
}
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index 673fa49..0221929 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -19,11 +19,12 @@
package org.apache.flink.runtime.messages;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
@@ -57,8 +58,12 @@ public class WebMonitorMessagesTest {
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(ClusterOverview.class, rnd));
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(JobsOverview.class, rnd));
- GenericMessageTester.testMessageInstance(new JobsWithIDsOverview(
- randomIds(rnd), randomIds(rnd), randomIds(rnd), randomIds(rnd)));
+ GenericMessageTester.testMessageInstance(new JobIdsWithStatusesOverview(Arrays.asList(
+ Tuple2.of(JobID.generate(), JobStatus.RUNNING),
+ Tuple2.of(JobID.generate(), JobStatus.CANCELED),
+ Tuple2.of(JobID.generate(), JobStatus.CREATED),
+ Tuple2.of(JobID.generate(), JobStatus.FAILED),
+ Tuple2.of(JobID.generate(), JobStatus.RESTARTING))));
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
new file mode 100644
index 0000000..21a20c9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.Arrays;
+
+/**
+ * Marshalling test for the {@link JobIdsWithStatusesOverview} message.
+ */
+public class JobIdsWithStatusesOverviewTest extends RestResponseMarshallingTestBase<JobIdsWithStatusesOverview> {
+
+ @Override
+ protected Class<JobIdsWithStatusesOverview> getTestResponseClass() {
+ return JobIdsWithStatusesOverview.class;
+ }
+
+ @Override
+ protected JobIdsWithStatusesOverview getTestResponseInstance() {
+ return new JobIdsWithStatusesOverview(Arrays.asList(
+ Tuple2.of(JobID.generate(), JobStatus.RUNNING),
+ Tuple2.of(JobID.generate(), JobStatus.CANCELED),
+ Tuple2.of(JobID.generate(), JobStatus.CREATED),
+ Tuple2.of(JobID.generate(), JobStatus.FAILED),
+ Tuple2.of(JobID.generate(), JobStatus.RESTARTING)));
+ }
+}