You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/29 17:04:03 UTC
[1/2] flink git commit: [FLINK-7652] [flip6] Introduce
JobIdWithStatus type for JobIdsWithStatusOverview
Repository: flink
Updated Branches:
refs/heads/master 731896ae7 -> 9fc3c71f8
[FLINK-7652] [flip6] Introduce JobIdWithStatus type for JobIdsWithStatusOverview
Instead of storing the JobID and the JobStatus in a Tuple2 which is serialized
as an array of values in JSON, this commit introduces the JobIdWithStatus which
is serialized as a proper JSON object with an id and a status field.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fc3c71f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fc3c71f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fc3c71f
Branch: refs/heads/master
Commit: 9fc3c71f81a2728fd60c7ce26023c23fa2ced37a
Parents: 67aad88
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 29 16:00:46 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 18:03:41 2017 +0100
----------------------------------------------------------------------
.../runtime/akka/AkkaJobManagerGateway.java | 6 +-
.../runtime/dispatcher/DispatcherGateway.java | 9 --
.../dispatcher/DispatcherRestEndpoint.java | 10 +-
.../runtime/jobmaster/JobManagerGateway.java | 4 +-
.../webmonitor/JobIdsWithStatusOverview.java | 161 +++++++++++++++++++
.../webmonitor/JobIdsWithStatusesOverview.java | 152 -----------------
.../runtime/rest/handler/job/JobIdsHandler.java | 21 ++-
.../handler/legacy/CurrentJobIdsHandler.java | 16 +-
.../JobIdsWithStatusesOverviewHeaders.java | 10 +-
.../flink/runtime/jobmanager/JobManager.scala | 14 +-
.../runtime/jobmanager/MemoryArchivist.scala | 10 +-
.../messages/WebMonitorMessagesTest.java | 17 +-
.../JobIdsWithStatusOverviewTest.java | 46 ++++++
.../JobIdsWithStatusesOverviewTest.java | 47 ------
14 files changed, 262 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index f7f8898..0a465b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
@@ -283,11 +283,11 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
}
@Override
- public CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout) {
+ public CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
- .mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusesOverview.class)));
+ .mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusOverview.class)));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 51e9112..12cbbfb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -80,12 +79,4 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
* @return A future integer of the blob server port
*/
CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
-
- /**
- * Request details of submitted jobs.
- *
- * @param timeout RPC timeout
- * @return A future of the details of submitted jobs.
- */
- CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 4556bb8..44eb3da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -165,19 +165,19 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
DashboardConfigurationHeaders.getInstance(),
restConfiguration.getRefreshInterval());
- JobsOverviewHandler<DispatcherGateway> jobsOverviewHandler = new JobsOverviewHandler<>(
+ JobIdsHandler<DispatcherGateway> jobIdsHandler = new JobIdsHandler<>(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
- JobsOverviewHeaders.getInstance());
+ JobIdsWithStatusesOverviewHeaders.getInstance());
- JobIdsHandler<DispatcherGateway> jobIdsHandler = new JobIdsHandler<>(
+ JobsOverviewHandler<DispatcherGateway> jobsOverviewHandler = new JobsOverviewHandler<>(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
- JobIdsWithStatusesOverviewHeaders.getInstance());
+ JobsOverviewHeaders.getInstance());
ClusterConfigHandler<DispatcherGateway> clusterConfigurationHandler = new ClusterConfigHandler<>(
restAddressFuture,
@@ -378,8 +378,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigHandler));
- handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
handlers.add(Tuple2.of(JobIdsWithStatusesOverviewHeaders.getInstance(), jobIdsHandler));
+ handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index 84f42fc..1695853 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import javax.annotation.Nullable;
@@ -127,5 +127,5 @@ public interface JobManagerGateway extends RestfulGateway {
* @param timeout for the asynchronous operation
* @return Future containing the job overview
*/
- CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout);
+ CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverview.java
new file mode 100644
index 0000000..63c23f2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverview.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An overview of how many jobs are in which status.
+ */
+public class JobIdsWithStatusOverview implements ResponseBody, InfoMessage {
+
+ private static final long serialVersionUID = -3699051943490133183L;
+
+ public static final String FIELD_NAME_JOBS = "jobs";
+
+ @JsonProperty(FIELD_NAME_JOBS)
+ private final Collection<JobIdWithStatus> jobsWithStatus;
+
+ @JsonCreator
+ public JobIdsWithStatusOverview(
+ @JsonProperty(FIELD_NAME_JOBS) Collection<JobIdWithStatus> jobsWithStatus) {
+ this.jobsWithStatus = checkNotNull(jobsWithStatus);
+ }
+
+ public JobIdsWithStatusOverview(JobIdsWithStatusOverview first, JobIdsWithStatusOverview second) {
+ this.jobsWithStatus = combine(first.getJobsWithStatus(), second.getJobsWithStatus());
+ }
+
+ public Collection<JobIdWithStatus> getJobsWithStatus() {
+ return jobsWithStatus;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return jobsWithStatus.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ else if (obj instanceof JobIdsWithStatusOverview) {
+ JobIdsWithStatusOverview that = (JobIdsWithStatusOverview) obj;
+ return jobsWithStatus.equals(that.getJobsWithStatus());
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JobIdsWithStatusesOverview { " + jobsWithStatus + " }";
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static Collection<JobIdWithStatus> combine(
+ Collection<JobIdWithStatus> first,
+ Collection<JobIdWithStatus> second) {
+ checkNotNull(first);
+ checkNotNull(second);
+
+ ArrayList<JobIdWithStatus> result = new ArrayList<>(first.size() + second.size());
+
+ result.addAll(first);
+ result.addAll(second);
+
+ return result;
+ }
+
+ // -------------------------------------------------------------------------
+ // Static classes
+ // -------------------------------------------------------------------------
+
+ public static final class JobIdWithStatus implements Serializable {
+
+ private static final long serialVersionUID = -499449819268733026L;
+
+ public static final String FIELD_NAME_JOB_ID = "id";
+
+ public static final String FIELD_NAME_JOB_STATUS = "status";
+
+ @JsonProperty(FIELD_NAME_JOB_ID)
+ @JsonSerialize(using = JobIDSerializer.class)
+ private final JobID jobId;
+
+ @JsonProperty(FIELD_NAME_JOB_STATUS)
+ private final JobStatus jobStatus;
+
+ @JsonCreator
+ public JobIdWithStatus(
+ @JsonProperty(FIELD_NAME_JOB_ID) @JsonDeserialize(using = JobIDDeserializer.class) JobID jobId,
+ @JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus) {
+ this.jobId = Preconditions.checkNotNull(jobId);
+ this.jobStatus = Preconditions.checkNotNull(jobStatus);
+ }
+
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ public JobStatus getJobStatus() {
+ return jobStatus;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobIdWithStatus that = (JobIdWithStatus) o;
+ return Objects.equals(jobId, that.jobId) &&
+ jobStatus == that.jobStatus;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobId, jobStatus);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
deleted file mode 100644
index 59c1e2c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.rest.messages.ResponseBody;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An overview of how many jobs are in which status.
- */
-public class JobIdsWithStatusesOverview implements ResponseBody, InfoMessage {
-
- private static final long serialVersionUID = -3699051943490133183L;
-
- public static final String FIELD_NAME_JOB_IDS = "job-ids";
-
- @JsonProperty(FIELD_NAME_JOB_IDS)
- @JsonSerialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusSerializer.class)
- private final Collection<Tuple2<JobID, JobStatus>> jobIds;
-
- @JsonCreator
- public JobIdsWithStatusesOverview(
- @JsonProperty(FIELD_NAME_JOB_IDS) @JsonDeserialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusDeserializer.class) Collection<Tuple2<JobID, JobStatus>> jobIds) {
- this.jobIds = checkNotNull(jobIds);
- }
-
- public JobIdsWithStatusesOverview(JobIdsWithStatusesOverview first, JobIdsWithStatusesOverview second) {
- this.jobIds = combine(first.getJobIds(), second.getJobIds());
- }
-
- public Collection<Tuple2<JobID, JobStatus>> getJobIds() {
- return jobIds;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return jobIds.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- else if (obj instanceof JobIdsWithStatusesOverview) {
- JobIdsWithStatusesOverview that = (JobIdsWithStatusesOverview) obj;
- return jobIds.equals(that.getJobIds());
- }
- else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return "JobIdsWithStatusesOverview { " + jobIds + " }";
- }
-
- public static final class JobIdWithStatusSerializer extends StdSerializer<Tuple2<JobID, JobStatus>> {
-
- private static final long serialVersionUID = 2196011372021674535L;
-
- public JobIdWithStatusSerializer() {
- super(Tuple2.class, true);
- }
-
- @Override
- public void serialize(
- Tuple2<JobID, JobStatus> jobIdWithStatus,
- JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider) throws IOException {
-
- jsonGenerator.writeStartArray();
-
- jsonGenerator.writeString(jobIdWithStatus.f0.toString());
- jsonGenerator.writeString(jobIdWithStatus.f1.name());
-
- jsonGenerator.writeEndArray();
- }
- }
-
- public static final class JobIdWithStatusDeserializer extends StdDeserializer<Tuple2<JobID, JobStatus>> {
-
- private static final long serialVersionUID = 5378670283978134794L;
-
- public JobIdWithStatusDeserializer() {
- super(Tuple2.class);
- }
-
- @Override
- public Tuple2<JobID, JobStatus> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
- Tuple2<JobID, JobStatus> deserializedIdWithStatus = Tuple2.of(
- JobID.fromHexString(jsonParser.nextTextValue()), JobStatus.valueOf(jsonParser.nextTextValue()));
-
- // read the END_ARRAY token, otherwise it will be mistaken as the end of the whole collection of ids and statuses
- jsonParser.nextValue();
-
- return deserializedIdWithStatus;
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static Collection<Tuple2<JobID, JobStatus>> combine(
- Collection<Tuple2<JobID, JobStatus>> first,
- Collection<Tuple2<JobID, JobStatus>> second) {
-
- checkNotNull(first);
- checkNotNull(second);
- ArrayList<Tuple2<JobID, JobStatus>> result = new ArrayList<>(first.size() + second.size());
- result.addAll(first);
- result.addAll(second);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
index 9eea83c..0fdf18c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
@@ -19,8 +19,7 @@
package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -42,14 +41,14 @@ import java.util.stream.Collectors;
* @param <T> type of the leader gateway
*/
public class JobIdsHandler<T extends RestfulGateway>
- extends AbstractRestHandler<T, EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+ extends AbstractRestHandler<T, EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> {
public JobIdsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends T> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
- MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> messageHeaders) {
+ MessageHeaders<EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> messageHeaders) {
super(
localRestAddress,
leaderRetriever,
@@ -59,13 +58,21 @@ public class JobIdsHandler<T extends RestfulGateway>
}
@Override
- protected CompletableFuture<JobIdsWithStatusesOverview> handleRequest(
+ protected CompletableFuture<JobIdsWithStatusOverview> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
@Nonnull T gateway) throws RestHandlerException {
return gateway.requestJobDetails(timeout).thenApply(
- multipleJobDetails -> new JobIdsWithStatusesOverview(
- multipleJobDetails.getJobs().stream().map(jobDetails -> Tuple2.of(jobDetails.getJobId(), jobDetails.getStatus())).collect(Collectors.toList())
+ multipleJobDetails -> new JobIdsWithStatusOverview(
+ multipleJobDetails
+ .getJobs()
+ .stream()
+ .map(
+ jobDetails ->
+ new JobIdsWithStatusOverview.JobIdWithStatus(
+ jobDetails.getJobId(),
+ jobDetails.getStatus()))
+ .collect(Collectors.toList())
)
);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
index 87f54fb..cf8a3d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
@@ -18,12 +18,9 @@
package org.apache.flink.runtime.rest.handler.legacy;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
import org.apache.flink.util.FlinkException;
@@ -68,18 +65,17 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
// we need no parameters, get all requests
try {
if (jobManagerGateway != null) {
- CompletableFuture<JobIdsWithStatusesOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
- JobIdsWithStatusesOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ CompletableFuture<JobIdsWithStatusOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+ JobIdsWithStatusOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
- gen.writeArrayFieldStart(JobIdsWithStatusesOverview.FIELD_NAME_JOB_IDS);
+ gen.writeArrayFieldStart(JobIdsWithStatusOverview.FIELD_NAME_JOBS);
- final JobIdsWithStatusesOverview.JobIdWithStatusSerializer jobIdWithStatusSerializer = new JobIdsWithStatusesOverview.JobIdWithStatusSerializer();
- for (Tuple2<JobID, JobStatus> jobIdWithStatus : overview.getJobIds()) {
- jobIdWithStatusSerializer.serialize(jobIdWithStatus, gen, null);
+ for (JobIdsWithStatusOverview.JobIdWithStatus jobIdWithStatus : overview.getJobsWithStatus()) {
+ gen.writeObject(jobIdWithStatus);
}
gen.writeEndArray();
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
index 4dee21a..a9af911 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
@@ -18,15 +18,15 @@
package org.apache.flink.runtime.rest.messages;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
/**
- * Message headers for the {@link JobIdsWithStatusesOverview}.
+ * Message headers for the {@link JobIdsWithStatusOverview}.
*/
-public class JobIdsWithStatusesOverviewHeaders implements MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+public class JobIdsWithStatusesOverviewHeaders implements MessageHeaders<EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> {
public static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
@@ -50,8 +50,8 @@ public class JobIdsWithStatusesOverviewHeaders implements MessageHeaders<EmptyRe
}
@Override
- public Class<JobIdsWithStatusesOverview> getResponseClass() {
- return JobIdsWithStatusesOverview.class;
+ public Class<JobIdsWithStatusOverview> getResponseClass() {
+ return JobIdsWithStatusOverview.class;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2aeaa6e..7799a78 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -69,6 +69,7 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
@@ -1627,8 +1628,8 @@ class JobManager(
val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
future.onSuccess {
- case archiveOverview: JobIdsWithStatusesOverview =>
- theSender ! new JobIdsWithStatusesOverview(ourJobs, archiveOverview)
+ case archiveOverview: JobIdsWithStatusOverview =>
+ theSender ! new JobIdsWithStatusOverview(ourJobs, archiveOverview)
}(context.dispatcher)
case _ : RequestStatusOverview =>
@@ -1694,17 +1695,16 @@ class JobManager(
new JobsOverview(runningOrPending, finished, canceled, failed)
}
- private def createJobStatusWithIDsOverview() : JobIdsWithStatusesOverview = {
+ private def createJobStatusWithIDsOverview() : JobIdsWithStatusOverview = {
val jobIdsWithStatuses =
- new java.util.ArrayList[
- org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](currentJobs.size)
+ new java.util.ArrayList[JobIdWithStatus](currentJobs.size)
currentJobs.values.foreach { job =>
jobIdsWithStatuses.add(
- org.apache.flink.api.java.tuple.Tuple2.of(job._1.getJobID, job._1.getState))
+ new JobIdWithStatus(job._1.getJobID, job._1.getState))
}
- new JobIdsWithStatusesOverview(jobIdsWithStatuses)
+ new JobIdsWithStatusOverview(jobIdsWithStatuses)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 2efdfb2..de42ab2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, Executio
import org.apache.flink.runtime.history.FsJobArchivist
import org.apache.flink.runtime.messages.ArchiveMessages._
import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus
import scala.collection.mutable
import scala.concurrent.future
@@ -222,17 +223,16 @@ class MemoryArchivist(
new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
}
- private def createJobsWithIDsOverview() : JobIdsWithStatusesOverview = {
+ private def createJobsWithIDsOverview() : JobIdsWithStatusOverview = {
val jobIdsWithStatuses =
- new java.util.ArrayList[
- org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](graphs.size)
+ new java.util.ArrayList[JobIdWithStatus](graphs.size)
graphs.values.foreach { graph =>
jobIdsWithStatuses.add(
- org.apache.flink.api.java.tuple.Tuple2.of(graph.getJobID, graph.getState))
+ new JobIdWithStatus(graph.getJobID, graph.getState))
}
- new JobIdsWithStatusesOverview(jobIdsWithStatuses)
+ new JobIdsWithStatusOverview(jobIdsWithStatuses)
}
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index 0221929..c56f4f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -19,18 +19,17 @@
package org.apache.flink.runtime.messages;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
-import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.junit.Test;
@@ -58,12 +57,12 @@ public class WebMonitorMessagesTest {
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(ClusterOverview.class, rnd));
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(JobsOverview.class, rnd));
- GenericMessageTester.testMessageInstance(new JobIdsWithStatusesOverview(Arrays.asList(
- Tuple2.of(JobID.generate(), JobStatus.RUNNING),
- Tuple2.of(JobID.generate(), JobStatus.CANCELED),
- Tuple2.of(JobID.generate(), JobStatus.CREATED),
- Tuple2.of(JobID.generate(), JobStatus.FAILED),
- Tuple2.of(JobID.generate(), JobStatus.RESTARTING))));
+ GenericMessageTester.testMessageInstance(new JobIdsWithStatusOverview(Arrays.asList(
+ new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.RUNNING),
+ new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.CANCELED),
+ new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.CREATED),
+ new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.FAILED),
+ new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.RESTARTING))));
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverviewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverviewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverviewTest.java
new file mode 100644
index 0000000..faf9cc0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverviewTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.Arrays;
+
+/**
+ * Marshalling test for the {@link JobIdsWithStatusOverview} message.
+ */
+public class JobIdsWithStatusOverviewTest extends RestResponseMarshallingTestBase<JobIdsWithStatusOverview> {
+
+ @Override
+ protected Class<JobIdsWithStatusOverview> getTestResponseClass() {
+ return JobIdsWithStatusOverview.class;
+ }
+
+ @Override
+ protected JobIdsWithStatusOverview getTestResponseInstance() {
+ return new JobIdsWithStatusOverview(Arrays.asList(
+ new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.RUNNING),
+ new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.CANCELED),
+ new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.CREATED),
+ new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.FAILED),
+ new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.RESTARTING)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
deleted file mode 100644
index 21a20c9..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
-
-import java.util.Arrays;
-
-/**
- * Marshalling test for the {@link JobIdsWithStatusesOverview} message.
- */
-public class JobIdsWithStatusesOverviewTest extends RestResponseMarshallingTestBase<JobIdsWithStatusesOverview> {
-
- @Override
- protected Class<JobIdsWithStatusesOverview> getTestResponseClass() {
- return JobIdsWithStatusesOverview.class;
- }
-
- @Override
- protected JobIdsWithStatusesOverview getTestResponseInstance() {
- return new JobIdsWithStatusesOverview(Arrays.asList(
- Tuple2.of(JobID.generate(), JobStatus.RUNNING),
- Tuple2.of(JobID.generate(), JobStatus.CANCELED),
- Tuple2.of(JobID.generate(), JobStatus.CREATED),
- Tuple2.of(JobID.generate(), JobStatus.FAILED),
- Tuple2.of(JobID.generate(), JobStatus.RESTARTING)));
- }
-}
[2/2] flink git commit: [FLINK-7652] [flip6] Port
CurrentJobIdsHandler to new REST endpoint
Posted by tr...@apache.org.
[FLINK-7652] [flip6] Port CurrentJobIdsHandler to new REST endpoint
This closes #4734.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67aad88e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67aad88e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67aad88e
Branch: refs/heads/master
Commit: 67aad88ee025ce02053ab560f2504762f53b87d9
Parents: 731896a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Sep 27 13:02:32 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 18:03:41 2017 +0100
----------------------------------------------------------------------
.../runtime/akka/AkkaJobManagerGateway.java | 6 +-
.../runtime/dispatcher/DispatcherGateway.java | 9 ++
.../dispatcher/DispatcherRestEndpoint.java | 10 ++
.../runtime/jobmaster/JobManagerGateway.java | 4 +-
.../flink/runtime/jobmaster/JobMaster.java | 3 +-
.../webmonitor/JobIdsWithStatusesOverview.java | 152 +++++++++++++++++++
.../webmonitor/JobsWithIDsOverview.java | 120 ---------------
.../runtime/rest/handler/job/JobIdsHandler.java | 72 +++++++++
.../handler/legacy/CurrentJobIdsHandler.java | 45 +++---
.../JobIdsWithStatusesOverviewHeaders.java | 70 +++++++++
.../flink/runtime/jobmanager/JobManager.scala | 27 ++--
.../runtime/jobmanager/MemoryArchivist.scala | 19 +--
.../messages/WebMonitorMessagesTest.java | 11 +-
.../JobIdsWithStatusesOverviewTest.java | 47 ++++++
14 files changed, 410 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 57b5dec..f7f8898 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
@@ -283,11 +283,11 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
}
@Override
- public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout) {
+ public CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
- .mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class)));
+ .mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusesOverview.class)));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 12cbbfb..51e9112 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -79,4 +80,12 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
* @return A future integer of the blob server port
*/
CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
+
+ /**
+ * Request details of submitted jobs.
+ *
+ * @param timeout RPC timeout
+ * @return A future of the details of submitted jobs.
+ */
+ CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index a99151a..4556bb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
@@ -170,6 +172,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
responseHeaders,
JobsOverviewHeaders.getInstance());
+ JobIdsHandler<DispatcherGateway> jobIdsHandler = new JobIdsHandler<>(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobIdsWithStatusesOverviewHeaders.getInstance());
+
ClusterConfigHandler<DispatcherGateway> clusterConfigurationHandler = new ClusterConfigHandler<>(
restAddressFuture,
leaderRetriever,
@@ -370,6 +379,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigHandler));
handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
+ handlers.add(Tuple2.of(JobIdsWithStatusesOverviewHeaders.getInstance(), jobIdsHandler));
handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index 2527e46..84f42fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import javax.annotation.Nullable;
@@ -127,5 +127,5 @@ public interface JobManagerGateway extends RestfulGateway {
* @param timeout for the asynchronous operation
* @return Future containing the job overview
*/
- CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout);
+ CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 4535290..e2fb65f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -742,7 +743,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
}
@Override
- public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+ public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout) {
return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
new file mode 100644
index 0000000..59c1e2c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An overview of how many jobs are in which status.
+ */
+public class JobIdsWithStatusesOverview implements ResponseBody, InfoMessage {
+
+ private static final long serialVersionUID = -3699051943490133183L;
+
+ public static final String FIELD_NAME_JOB_IDS = "job-ids";
+
+ @JsonProperty(FIELD_NAME_JOB_IDS)
+ @JsonSerialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusSerializer.class)
+ private final Collection<Tuple2<JobID, JobStatus>> jobIds;
+
+ @JsonCreator
+ public JobIdsWithStatusesOverview(
+ @JsonProperty(FIELD_NAME_JOB_IDS) @JsonDeserialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusDeserializer.class) Collection<Tuple2<JobID, JobStatus>> jobIds) {
+ this.jobIds = checkNotNull(jobIds);
+ }
+
+ public JobIdsWithStatusesOverview(JobIdsWithStatusesOverview first, JobIdsWithStatusesOverview second) {
+ this.jobIds = combine(first.getJobIds(), second.getJobIds());
+ }
+
+ public Collection<Tuple2<JobID, JobStatus>> getJobIds() {
+ return jobIds;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return jobIds.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ else if (obj instanceof JobIdsWithStatusesOverview) {
+ JobIdsWithStatusesOverview that = (JobIdsWithStatusesOverview) obj;
+ return jobIds.equals(that.getJobIds());
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JobIdsWithStatusesOverview { " + jobIds + " }";
+ }
+
+ public static final class JobIdWithStatusSerializer extends StdSerializer<Tuple2<JobID, JobStatus>> {
+
+ private static final long serialVersionUID = 2196011372021674535L;
+
+ public JobIdWithStatusSerializer() {
+ super(Tuple2.class, true);
+ }
+
+ @Override
+ public void serialize(
+ Tuple2<JobID, JobStatus> jobIdWithStatus,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException {
+
+ jsonGenerator.writeStartArray();
+
+ jsonGenerator.writeString(jobIdWithStatus.f0.toString());
+ jsonGenerator.writeString(jobIdWithStatus.f1.name());
+
+ jsonGenerator.writeEndArray();
+ }
+ }
+
+ public static final class JobIdWithStatusDeserializer extends StdDeserializer<Tuple2<JobID, JobStatus>> {
+
+ private static final long serialVersionUID = 5378670283978134794L;
+
+ public JobIdWithStatusDeserializer() {
+ super(Tuple2.class);
+ }
+
+ @Override
+ public Tuple2<JobID, JobStatus> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+ Tuple2<JobID, JobStatus> deserializedIdWithStatus = Tuple2.of(
+ JobID.fromHexString(jsonParser.nextTextValue()), JobStatus.valueOf(jsonParser.nextTextValue()));
+
+ // read the END_ARRAY token, otherwise it will be mistaken as the end of the whole collection of ids and statuses
+ jsonParser.nextValue();
+
+ return deserializedIdWithStatus;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static Collection<Tuple2<JobID, JobStatus>> combine(
+ Collection<Tuple2<JobID, JobStatus>> first,
+ Collection<Tuple2<JobID, JobStatus>> second) {
+
+ checkNotNull(first);
+ checkNotNull(second);
+ ArrayList<Tuple2<JobID, JobStatus>> result = new ArrayList<>(first.size() + second.size());
+ result.addAll(first);
+ result.addAll(second);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
deleted file mode 100644
index 4cb9d8b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An overview of how many jobs are in which status.
- */
-public class JobsWithIDsOverview implements InfoMessage {
-
- private static final long serialVersionUID = -3699051943490133183L;
-
- private final List<JobID> jobsRunningOrPending;
- private final List<JobID> jobsFinished;
- private final List<JobID> jobsCancelled;
- private final List<JobID> jobsFailed;
-
- public JobsWithIDsOverview(List<JobID> jobsRunningOrPending, List<JobID> jobsFinished,
- List<JobID> jobsCancelled, List<JobID> jobsFailed) {
-
- this.jobsRunningOrPending = checkNotNull(jobsRunningOrPending);
- this.jobsFinished = checkNotNull(jobsFinished);
- this.jobsCancelled = checkNotNull(jobsCancelled);
- this.jobsFailed = checkNotNull(jobsFailed);
- }
-
- public JobsWithIDsOverview(JobsWithIDsOverview first, JobsWithIDsOverview second) {
- this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
- this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
- this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
- this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
- }
-
- public List<JobID> getJobsRunningOrPending() {
- return jobsRunningOrPending;
- }
-
- public List<JobID> getJobsFinished() {
- return jobsFinished;
- }
-
- public List<JobID> getJobsCancelled() {
- return jobsCancelled;
- }
-
- public List<JobID> getJobsFailed() {
- return jobsFailed;
- }
-
- // ------------------------------------------------------------------------
-
-
- @Override
- public int hashCode() {
- return jobsRunningOrPending.hashCode() ^
- jobsFinished.hashCode() ^
- jobsCancelled.hashCode() ^
- jobsFailed.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- else if (obj instanceof JobsWithIDsOverview) {
- JobsWithIDsOverview that = (JobsWithIDsOverview) obj;
- return this.jobsRunningOrPending.equals(that.jobsRunningOrPending) &&
- this.jobsFinished.equals(that.jobsFinished) &&
- this.jobsCancelled.equals(that.jobsCancelled) &&
- this.jobsFailed.equals(that.jobsFailed);
- }
- else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return "JobsOverview {" +
- "numJobsRunningOrPending=" + jobsRunningOrPending +
- ", numJobsFinished=" + jobsFinished +
- ", numJobsCancelled=" + jobsCancelled +
- ", numJobsFailed=" + jobsFailed +
- '}';
- }
-
- // ------------------------------------------------------------------------
-
- private static ArrayList<JobID> combine(List<JobID> first, List<JobID> second) {
- checkNotNull(first);
- checkNotNull(second);
- ArrayList<JobID> result = new ArrayList<JobID>(first.size() + second.size());
- result.addAll(first);
- result.addAll(second);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
new file mode 100644
index 0000000..9eea83c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for job IDs.
+ *
+ * @param <T> type of the leader gateway
+ */
+public class JobIdsHandler<T extends RestfulGateway>
+ extends AbstractRestHandler<T, EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+
+ public JobIdsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends T> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> messageHeaders) {
+ super(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders);
+ }
+
+ @Override
+ protected CompletableFuture<JobIdsWithStatusesOverview> handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
+ @Nonnull T gateway) throws RestHandlerException {
+
+ return gateway.requestJobDetails(timeout).thenApply(
+ multipleJobDetails -> new JobIdsWithStatusesOverview(
+ multipleJobDetails.getJobs().stream().map(jobDetails -> Tuple2.of(jobDetails.getJobId(), jobDetails.getStatus())).collect(Collectors.toList())
+ )
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
index cbeb957..87f54fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -42,8 +45,6 @@ import static java.util.Objects.requireNonNull;
*/
public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
- private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
-
private final Time timeout;
public CurrentJobIdsHandler(Executor executor, Time timeout) {
@@ -53,51 +54,39 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
@Override
public String[] getPaths() {
- return new String[]{CURRENT_JOB_IDS_REST_PATH};
+ return new String[]{JobIdsWithStatusesOverviewHeaders.CURRENT_JOB_IDS_REST_PATH};
}
@Override
- public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ public CompletableFuture<String> handleJsonRequest(
+ Map<String, String> pathParams,
+ Map<String, String> queryParams,
+ JobManagerGateway jobManagerGateway) {
+
return CompletableFuture.supplyAsync(
() -> {
// we need no parameters, get all requests
try {
if (jobManagerGateway != null) {
- CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
- JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ CompletableFuture<JobIdsWithStatusesOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+ JobIdsWithStatusesOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
+ gen.writeArrayFieldStart(JobIdsWithStatusesOverview.FIELD_NAME_JOB_IDS);
- gen.writeArrayFieldStart("jobs-running");
- for (JobID jid : overview.getJobsRunningOrPending()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-finished");
- for (JobID jid : overview.getJobsFinished()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-cancelled");
- for (JobID jid : overview.getJobsCancelled()) {
- gen.writeString(jid.toString());
+ final JobIdsWithStatusesOverview.JobIdWithStatusSerializer jobIdWithStatusSerializer = new JobIdsWithStatusesOverview.JobIdWithStatusSerializer();
+ for (Tuple2<JobID, JobStatus> jobIdWithStatus : overview.getJobIds()) {
+ jobIdWithStatusSerializer.serialize(jobIdWithStatus, gen, null);
}
- gen.writeEndArray();
- gen.writeArrayFieldStart("jobs-failed");
- for (JobID jid : overview.getJobsFailed()) {
- gen.writeString(jid.toString());
- }
gen.writeEndArray();
-
gen.writeEndObject();
gen.close();
+
return writer.toString();
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
new file mode 100644
index 0000000..4dee21a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobIdsWithStatusesOverview}.
+ */
+public class JobIdsWithStatusesOverviewHeaders implements MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+
+ public static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
+
+ private static final JobIdsWithStatusesOverviewHeaders INSTANCE = new JobIdsWithStatusesOverviewHeaders();
+
+ private JobIdsWithStatusesOverviewHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return CURRENT_JOB_IDS_REST_PATH;
+ }
+
+ @Override
+ public Class<JobIdsWithStatusesOverview> getResponseClass() {
+ return JobIdsWithStatusesOverview.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public EmptyMessageParameters getUnresolvedMessageParameters() {
+ return EmptyMessageParameters.getInstance();
+ }
+
+ public static JobIdsWithStatusesOverviewHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c12db23..2aeaa6e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1627,8 +1627,8 @@ class JobManager(
val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
future.onSuccess {
- case archiveOverview: JobsWithIDsOverview =>
- theSender ! new JobsWithIDsOverview(ourJobs, archiveOverview)
+ case archiveOverview: JobIdsWithStatusesOverview =>
+ theSender ! new JobIdsWithStatusesOverview(ourJobs, archiveOverview)
}(context.dispatcher)
case _ : RequestStatusOverview =>
@@ -1694,22 +1694,17 @@ class JobManager(
new JobsOverview(runningOrPending, finished, canceled, failed)
}
- private def createJobStatusWithIDsOverview() : JobsWithIDsOverview = {
- val runningOrPending = new java.util.ArrayList[JobID]()
- val finished = new java.util.ArrayList[JobID]()
- val canceled = new java.util.ArrayList[JobID]()
- val failed = new java.util.ArrayList[JobID]()
-
- currentJobs.values.foreach { case (graph, _) =>
- graph.getState() match {
- case JobStatus.FINISHED => finished.add(graph.getJobID)
- case JobStatus.CANCELED => canceled.add(graph.getJobID)
- case JobStatus.FAILED => failed.add(graph.getJobID)
- case _ => runningOrPending.add(graph.getJobID)
- }
+ private def createJobStatusWithIDsOverview() : JobIdsWithStatusesOverview = {
+ val jobIdsWithStatuses =
+ new java.util.ArrayList[
+ org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](currentJobs.size)
+
+ currentJobs.values.foreach { job =>
+ jobIdsWithStatuses.add(
+ org.apache.flink.api.java.tuple.Tuple2.of(job._1.getJobID, job._1.getState))
}
- new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+ new JobIdsWithStatusesOverview(jobIdsWithStatuses)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 3885596..2efdfb2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -222,22 +222,17 @@ class MemoryArchivist(
new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
}
- private def createJobsWithIDsOverview() : JobsWithIDsOverview = {
- val runningOrPending = new util.ArrayList[JobID]()
- val finished = new util.ArrayList[JobID]()
- val canceled = new util.ArrayList[JobID]()
- val failed = new util.ArrayList[JobID]()
+ private def createJobsWithIDsOverview() : JobIdsWithStatusesOverview = {
+ val jobIdsWithStatuses =
+ new java.util.ArrayList[
+ org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](graphs.size)
graphs.values.foreach { graph =>
- graph.getState() match {
- case JobStatus.FINISHED => finished.add(graph.getJobID)
- case JobStatus.CANCELED => canceled.add(graph.getJobID)
- case JobStatus.FAILED => failed.add(graph.getJobID)
- case _ => runningOrPending.add(graph.getJobID)
- }
+ jobIdsWithStatuses.add(
+ org.apache.flink.api.java.tuple.Tuple2.of(graph.getJobID, graph.getState))
}
- new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+ new JobIdsWithStatusesOverview(jobIdsWithStatuses)
}
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index 673fa49..0221929 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -19,11 +19,12 @@
package org.apache.flink.runtime.messages;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
@@ -57,8 +58,12 @@ public class WebMonitorMessagesTest {
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(ClusterOverview.class, rnd));
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(JobsOverview.class, rnd));
- GenericMessageTester.testMessageInstance(new JobsWithIDsOverview(
- randomIds(rnd), randomIds(rnd), randomIds(rnd), randomIds(rnd)));
+ GenericMessageTester.testMessageInstance(new JobIdsWithStatusesOverview(Arrays.asList(
+ Tuple2.of(JobID.generate(), JobStatus.RUNNING),
+ Tuple2.of(JobID.generate(), JobStatus.CANCELED),
+ Tuple2.of(JobID.generate(), JobStatus.CREATED),
+ Tuple2.of(JobID.generate(), JobStatus.FAILED),
+ Tuple2.of(JobID.generate(), JobStatus.RESTARTING))));
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
new file mode 100644
index 0000000..21a20c9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.Arrays;
+
+/**
+ * Marshalling test for the {@link JobIdsWithStatusesOverview} message.
+ */
+public class JobIdsWithStatusesOverviewTest extends RestResponseMarshallingTestBase<JobIdsWithStatusesOverview> {
+
+ @Override
+ protected Class<JobIdsWithStatusesOverview> getTestResponseClass() {
+ return JobIdsWithStatusesOverview.class;
+ }
+
+ @Override
+ protected JobIdsWithStatusesOverview getTestResponseInstance() {
+ return new JobIdsWithStatusesOverview(Arrays.asList(
+ Tuple2.of(JobID.generate(), JobStatus.RUNNING),
+ Tuple2.of(JobID.generate(), JobStatus.CANCELED),
+ Tuple2.of(JobID.generate(), JobStatus.CREATED),
+ Tuple2.of(JobID.generate(), JobStatus.FAILED),
+ Tuple2.of(JobID.generate(), JobStatus.RESTARTING)));
+ }
+}