You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:17 UTC
[19/30] flink git commit: [FLINK-7806] [flip6] Register
CurrentJobsOverviewHandler under /jobs/overview
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 08946ed..57b5dec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -223,7 +223,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
}
@Override
- public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) {
+ public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(new RequestJobDetails(true, true), FutureUtils.toFiniteDuration(timeout))
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index cf3405b..51d7755 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -330,7 +330,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
@Override
- public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) {
+ public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
final int numberJobsRunning = jobManagerRunners.size();
ArrayList<CompletableFuture<JobDetails>> individualJobDetails = new ArrayList<>(numberJobsRunning);
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 8244d95..4de77b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
@@ -37,6 +36,7 @@ import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatis
import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
-import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
@@ -56,7 +55,6 @@ import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
-import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -66,6 +64,7 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -162,17 +161,12 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executor,
restConfiguration.getRefreshInterval()));
- LegacyRestHandlerAdapter<DispatcherGateway, MultipleJobsDetails, EmptyMessageParameters> currentJobsOverviewHandler = new LegacyRestHandlerAdapter<>(
+ JobsOverviewHandler<DispatcherGateway> jobsOverviewHandler = new JobsOverviewHandler<>(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
- CurrentJobsOverviewHandlerHeaders.getInstance(),
- new CurrentJobsOverviewHandler(
- executor,
- timeout,
- true,
- true));
+ JobsOverviewHeaders.getInstance());
LegacyRestHandlerAdapter<DispatcherGateway, ClusterConfigurationInfo, EmptyMessageParameters> clusterConfigurationHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture,
@@ -331,7 +325,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
- handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));
+ handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
new file mode 100644
index 0000000..46aa887
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Overview handler for jobs.
+ *
+ * @param <T> type of the leader gateway
+ */
+public class JobsOverviewHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+
+ public JobsOverviewHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends T> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> messageHeaders) {
+ super(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders);
+ }
+
+ @Override
+ protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull T gateway) throws RestHandlerException {
+ return gateway.requestJobDetails(timeout);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index 98e95c0..30e7fec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -41,7 +42,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
-import static org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders.CLUSTER_OVERVIEW_REST_PATH;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -63,7 +63,7 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler implement
@Override
public String[] getPaths() {
- return new String[]{CLUSTER_OVERVIEW_REST_PATH};
+ return new String[]{ClusterOverviewHeaders.URL};
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
deleted file mode 100644
index 27edac8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.util.FlinkException;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler that returns a summary of the job status.
- */
-public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, MultipleJobsDetails, EmptyMessageParameters> {
-
- private static final String ALL_JOBS_REST_PATH = "/joboverview";
- private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
- private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed";
-
- private final Time timeout;
-
- private final boolean includeRunningJobs;
- private final boolean includeFinishedJobs;
-
- public CurrentJobsOverviewHandler(
- Executor executor,
- Time timeout,
- boolean includeRunningJobs,
- boolean includeFinishedJobs) {
-
- super(executor);
- this.timeout = checkNotNull(timeout);
- this.includeRunningJobs = includeRunningJobs;
- this.includeFinishedJobs = includeFinishedJobs;
- }
-
- @Override
- public CompletableFuture<MultipleJobsDetails> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
- return gateway.requestJobDetails(true, true, timeout);
- }
-
- @Override
- public String[] getPaths() {
- if (includeRunningJobs && includeFinishedJobs) {
- return new String[]{ALL_JOBS_REST_PATH};
- }
- if (includeRunningJobs) {
- return new String[]{RUNNING_JOBS_REST_PATH};
- } else {
- return new String[]{COMPLETED_JOBS_REST_PATH};
- }
- }
-
- @Override
- public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
- if (jobManagerGateway != null) {
- CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
-
- return jobDetailsFuture.thenApplyAsync(
- (MultipleJobsDetails result) -> {
- final long now = System.currentTimeMillis();
-
- StringWriter writer = new StringWriter();
- try {
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
-
- gen.writeStartObject();
-
- if (includeRunningJobs && includeFinishedJobs) {
- gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
- for (JobDetails detail : result.getRunning()) {
- jobDetailsSerializer.serialize(detail, gen, null);
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
- for (JobDetails detail : result.getFinished()) {
- jobDetailsSerializer.serialize(detail, gen, null);
- }
- gen.writeEndArray();
- } else {
- gen.writeArrayFieldStart("jobs");
- for (JobDetails detail : includeRunningJobs ? result.getRunning() : result.getFinished()) {
- jobDetailsSerializer.serialize(detail, gen, null);
- }
- gen.writeEndArray();
- }
-
- gen.writeEndObject();
- gen.close();
- return writer.toString();
- } catch (IOException e) {
- throw new CompletionException(new FlinkException("Could not write current jobs overview json.", e));
- }
- },
- executor);
- }
- else {
- return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
- }
- }
-
- /**
- * Archivist for the CurrentJobsOverviewHandler.
- */
- public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist {
-
- @Override
- public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
- StringWriter writer = new StringWriter();
- try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
- gen.writeStartObject();
- gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
- gen.writeEndArray();
- gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
-
- final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
- jobDetailsSerializer.serialize(WebMonitorUtils.createDetailsForJob(graph), gen, null);
-
- gen.writeEndArray();
- gen.writeEndObject();
- }
- String json = writer.toString();
- String path = ALL_JOBS_REST_PATH;
- return Collections.singleton(new ArchivedJson(path, json));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
new file mode 100644
index 0000000..6aa75b2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns a summary of the job status.
+ */
+public class JobsOverviewHandler extends AbstractJsonRequestHandler {
+
+ private static final String ALL_JOBS_REST_PATH = JobsOverviewHeaders.URL;
+
+ private final Time timeout;
+
+ public JobsOverviewHandler(
+ Executor executor,
+ Time timeout) {
+
+ super(executor);
+ this.timeout = checkNotNull(timeout);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{ALL_JOBS_REST_PATH};
+ }
+
+ @Override
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ if (jobManagerGateway != null) {
+ CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(timeout);
+
+ return jobDetailsFuture.thenApplyAsync(
+ (MultipleJobsDetails result) -> {
+
+ StringWriter writer = new StringWriter();
+ try {
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+ final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
+
+ gen.writeStartObject();
+
+ gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
+ for (JobDetails detail : result.getRunning()) {
+ jobDetailsSerializer.serialize(detail, gen, null);
+ }
+ gen.writeEndArray();
+
+ gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
+ for (JobDetails detail : result.getFinished()) {
+ jobDetailsSerializer.serialize(detail, gen, null);
+ }
+ gen.writeEndArray();
+
+ gen.writeEndObject();
+ gen.close();
+ return writer.toString();
+ } catch (IOException e) {
+ throw new CompletionException(new FlinkException("Could not write current jobs overview json.", e));
+ }
+ },
+ executor);
+ }
+ else {
+ return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
+ }
+ }
+
+ /**
+ * Archivist for the CurrentJobsOverviewHandler.
+ */
+ public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ StringWriter writer = new StringWriter();
+ try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
+ gen.writeStartObject();
+ gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
+ gen.writeEndArray();
+ gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
+
+ final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
+ jobDetailsSerializer.serialize(WebMonitorUtils.createDetailsForJob(graph), gen, null);
+
+ gen.writeEndArray();
+ gen.writeEndObject();
+ }
+ String json = writer.toString();
+ String path = ALL_JOBS_REST_PATH;
+ return Collections.singleton(new ArchivedJson(path, json));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index e71a1d7..e89ed55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -103,13 +103,10 @@ public class MetricFetcher<T extends RestfulGateway> {
if (optionalLeaderGateway.isPresent()) {
final T leaderGateway = optionalLeaderGateway.get();
- /**
+ /*
* Remove all metrics that belong to a job that is not running and no longer archived.
*/
- CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestJobDetails(
- true,
- true,
- timeout);
+ CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestJobDetails(timeout);
jobDetailsFuture.whenCompleteAsync(
(MultipleJobsDetails jobDetails, Throwable throwable) -> {
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
index 688be79..b6ae0f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
@@ -31,7 +31,7 @@ public final class ClusterOverviewHeaders implements MessageHeaders<EmptyRequest
private static final ClusterOverviewHeaders INSTANCE = new ClusterOverviewHeaders();
- public static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
+ public static final String URL = "/overview";
// make this class a singleton
private ClusterOverviewHeaders() {}
@@ -48,7 +48,7 @@ public final class ClusterOverviewHeaders implements MessageHeaders<EmptyRequest
@Override
public String getTargetRestEndpointURL() {
- return CLUSTER_OVERVIEW_REST_PATH;
+ return URL;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java
deleted file mode 100644
index f97c601..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * Message headers for {@link CurrentJobsOverviewHandler}.
- */
-public final class CurrentJobsOverviewHandlerHeaders implements MessageHeaders<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
-
- private static final CurrentJobsOverviewHandlerHeaders INSTANCE = new CurrentJobsOverviewHandlerHeaders();
-
- // make this class a singleton
- private CurrentJobsOverviewHandlerHeaders() {}
-
- @Override
- public Class<EmptyRequestBody> getRequestClass() {
- return EmptyRequestBody.class;
- }
-
- @Override
- public HttpMethodWrapper getHttpMethod() {
- return HttpMethodWrapper.GET;
- }
-
- @Override
- public String getTargetRestEndpointURL() {
- return "/joboverview";
- }
-
- @Override
- public Class<MultipleJobsDetails> getResponseClass() {
- return MultipleJobsDetails.class;
- }
-
- @Override
- public HttpResponseStatus getResponseStatusCode() {
- return HttpResponseStatus.OK;
- }
-
- @Override
- public EmptyMessageParameters getUnresolvedMessageParameters() {
- return EmptyMessageParameters.getInstance();
- }
-
- public static CurrentJobsOverviewHandlerHeaders getInstance() {
- return INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java
new file mode 100644
index 0000000..5be841c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for {@link JobsOverviewHandler}.
+ */
+public final class JobsOverviewHeaders implements MessageHeaders<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+
+ private static final JobsOverviewHeaders INSTANCE = new JobsOverviewHeaders();
+
+ public static final String URL = "/jobs/overview";
+
+ // make this class a singleton
+ private JobsOverviewHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ @Override
+ public Class<MultipleJobsDetails> getResponseClass() {
+ return MultipleJobsDetails.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public EmptyMessageParameters getUnresolvedMessageParameters() {
+ return EmptyMessageParameters.getInstance();
+ }
+
+ public static JobsOverviewHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 331e96b..d5de8e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -63,14 +63,10 @@ public interface RestfulGateway extends RpcGateway {
/**
* Requests job details currently being executed on the Flink cluster.
*
- * @param includeRunning true if running jobs shall be included, otherwise false
- * @param includeFinished true if finished jobs shall be included, otherwise false
* @param timeout for the asynchronous operation
* @return Future containing the job details
*/
CompletableFuture<MultipleJobsDetails> requestJobDetails(
- boolean includeRunning,
- boolean includeFinished,
@RpcTimeout Time timeout);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
deleted file mode 100644
index c9f91d4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-
-/**
- * Tests for the CurrentJobsOverviewHandler.
- */
-public class CurrentJobsOverviewHandlerTest {
-
- @Test
- public void testArchiver() throws Exception {
- JsonArchivist archivist = new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
- AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
- JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
-
- Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
- Assert.assertEquals(1, archives.size());
-
- ArchivedJson archive = archives.iterator().next();
- Assert.assertEquals("/joboverview", archive.getPath());
-
- JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
- ArrayNode running = (ArrayNode) result.get("running");
- Assert.assertEquals(0, running.size());
-
- ArrayNode finished = (ArrayNode) result.get("finished");
- Assert.assertEquals(1, finished.size());
-
- compareJobOverview(expectedDetails, finished.get(0).toString());
- }
-
- @Test
- public void testGetPaths() {
- CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, true);
- String[] pathsAll = handlerAll.getPaths();
- Assert.assertEquals(1, pathsAll.length);
- Assert.assertEquals("/joboverview", pathsAll[0]);
-
- CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, false);
- String[] pathsRunning = handlerRunning.getPaths();
- Assert.assertEquals(1, pathsRunning.length);
- Assert.assertEquals("/joboverview/running", pathsRunning[0]);
-
- CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, true);
- String[] pathsCompleted = handlerCompleted.getPaths();
- Assert.assertEquals(1, pathsCompleted.length);
- Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);
- }
-
- @Test
- public void testJsonGeneration() throws Exception {
- AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
- JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
- StringWriter writer = new StringWriter();
- try (JsonGenerator gen = ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) {
- JobDetails.JobDetailsSerializer serializer = new JobDetails.JobDetailsSerializer();
- serializer.serialize(expectedDetails, gen, null);
- }
- compareJobOverview(expectedDetails, writer.toString());
- }
-
- private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException {
- JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(answer);
-
- Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText());
- Assert.assertEquals(expectedDetails.getJobName(), result.get("name").asText());
- Assert.assertEquals(expectedDetails.getStatus().name(), result.get("state").asText());
-
- Assert.assertEquals(expectedDetails.getStartTime(), result.get("start-time").asLong());
- Assert.assertEquals(expectedDetails.getEndTime(), result.get("end-time").asLong());
- Assert.assertEquals(expectedDetails.getEndTime() - expectedDetails.getStartTime(), result.get("duration").asLong());
- Assert.assertEquals(expectedDetails.getLastUpdateTime(), result.get("last-modification").asLong());
-
- JsonNode tasks = result.get("tasks");
- Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt());
- int[] tasksPerState = expectedDetails.getTasksPerState();
-
- for (ExecutionState executionState : ExecutionState.values()) {
- Assert.assertEquals(tasksPerState[executionState.ordinal()], tasks.get(executionState.name().toLowerCase()).asInt());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
new file mode 100644
index 0000000..ea40376
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+
+/**
+ * Tests for the CurrentJobsOverviewHandler.
+ */
+public class JobsOverviewHandlerTest extends TestLogger {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals(JobsOverviewHeaders.URL, archive.getPath());
+
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
+ ArrayNode running = (ArrayNode) result.get("running");
+ Assert.assertEquals(0, running.size());
+
+ ArrayNode finished = (ArrayNode) result.get("finished");
+ Assert.assertEquals(1, finished.size());
+
+ compareJobOverview(expectedDetails, finished.get(0).toString());
+ }
+
+ @Test
+ public void testGetPaths() {
+ JobsOverviewHandler handlerAll = new JobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L));
+ String[] pathsAll = handlerAll.getPaths();
+ Assert.assertEquals(1, pathsAll.length);
+ Assert.assertEquals(JobsOverviewHeaders.URL, pathsAll[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+ StringWriter writer = new StringWriter();
+ try (JsonGenerator gen = ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) {
+ JobDetails.JobDetailsSerializer serializer = new JobDetails.JobDetailsSerializer();
+ serializer.serialize(expectedDetails, gen, null);
+ }
+ compareJobOverview(expectedDetails, writer.toString());
+ }
+
+ private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(answer);
+
+ Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText());
+ Assert.assertEquals(expectedDetails.getJobName(), result.get("name").asText());
+ Assert.assertEquals(expectedDetails.getStatus().name(), result.get("state").asText());
+
+ Assert.assertEquals(expectedDetails.getStartTime(), result.get("start-time").asLong());
+ Assert.assertEquals(expectedDetails.getEndTime(), result.get("end-time").asLong());
+ Assert.assertEquals(expectedDetails.getEndTime() - expectedDetails.getStartTime(), result.get("duration").asLong());
+ Assert.assertEquals(expectedDetails.getLastUpdateTime(), result.get("last-modification").asLong());
+
+ JsonNode tasks = result.get("tasks");
+ Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt());
+ int[] tasksPerState = expectedDetails.getTasksPerState();
+
+ for (ExecutionState executionState : ExecutionState.values()) {
+ Assert.assertEquals(tasksPerState[executionState.ordinal()], tasks.get(executionState.name().toLowerCase()).asInt());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index faa97a7..e157b0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -54,7 +54,6 @@ import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
@@ -82,7 +81,7 @@ public class MetricFetcherTest extends TestLogger {
JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
- when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class)))
+ when(jobManagerGateway.requestJobDetails(any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList())));
when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));