You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/30 16:58:38 UTC

[1/3] flink git commit: [FLINK-8027] Generalize existing rest handlers to work with an arbitrary RestfulGateway

Repository: flink
Updated Branches:
  refs/heads/master a744d4bf3 -> f0e82dca3


[FLINK-8027] Generalize existing rest handlers to work with an arbitrary RestfulGateway

By letting the existing REST handlers work with an arbitrary RestfulGateway,
they can be used by the Dispatcher as well as the JobMaster, once it implements
the RestfulGateway.

This closes #4985.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0bd65db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0bd65db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0bd65db

Branch: refs/heads/master
Commit: b0bd65db9bb9d41ad85634d08dc8b9dae2cfa8dd
Parents: a744d4b
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 29 18:09:56 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 30 17:57:21 2017 +0100

----------------------------------------------------------------------
 .../runtime/dispatcher/DispatcherRestEndpoint.java    | 14 +++++++-------
 .../rest/handler/cluster/ClusterConfigHandler.java    |  8 +++-----
 .../rest/handler/cluster/ClusterOverviewHandler.java  |  8 +++-----
 .../rest/handler/cluster/DashboardConfigHandler.java  |  8 +++-----
 .../flink/runtime/rest/handler/job/JobIdsHandler.java |  9 +++------
 .../runtime/rest/handler/job/JobsOverviewHandler.java |  8 +++-----
 .../handler/job/metrics/AbstractMetricsHandler.java   |  8 ++++----
 .../handler/job/metrics/JobManagerMetricsHandler.java |  4 ++--
 .../rest/handler/job/metrics/JobMetricsHandler.java   |  4 ++--
 .../handler/job/metrics/JobVertexMetricsHandler.java  |  4 ++--
 .../handler/job/metrics/SubtaskMetricsHandler.java    |  4 ++--
 .../job/metrics/TaskManagerMetricsHandler.java        |  4 ++--
 .../taskmanager/TaskManagerDetailsHandler.java        |  6 ++----
 .../rest/handler/taskmanager/TaskManagersHandler.java |  4 ++--
 14 files changed, 40 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 44eb3da..4a99b9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -150,14 +150,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		final Time timeout = restConfiguration.getTimeout();
 		final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders();
 
-		ClusterOverviewHandler<DispatcherGateway> clusterOverviewHandler = new ClusterOverviewHandler<>(
+		ClusterOverviewHandler clusterOverviewHandler = new ClusterOverviewHandler(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
 			responseHeaders,
 			ClusterOverviewHeaders.getInstance());
 
-		DashboardConfigHandler<DispatcherGateway> dashboardConfigHandler = new DashboardConfigHandler<>(
+		DashboardConfigHandler dashboardConfigHandler = new DashboardConfigHandler(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
@@ -165,21 +165,21 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			DashboardConfigurationHeaders.getInstance(),
 			restConfiguration.getRefreshInterval());
 
-		JobIdsHandler<DispatcherGateway> jobIdsHandler = new JobIdsHandler<>(
+		JobIdsHandler jobIdsHandler = new JobIdsHandler(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
 			responseHeaders,
 			JobIdsWithStatusesOverviewHeaders.getInstance());
 
-		JobsOverviewHandler<DispatcherGateway> jobsOverviewHandler = new JobsOverviewHandler<>(
+		JobsOverviewHandler jobsOverviewHandler = new JobsOverviewHandler(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
 			responseHeaders,
 			JobsOverviewHeaders.getInstance());
 
-		ClusterConfigHandler<DispatcherGateway> clusterConfigurationHandler = new ClusterConfigHandler<>(
+		ClusterConfigHandler clusterConfigurationHandler = new ClusterConfigHandler(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
@@ -280,7 +280,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			timeout,
 			responseHeaders);
 
-		TaskManagersHandler<DispatcherGateway> taskManagersHandler = new TaskManagersHandler<>(
+		TaskManagersHandler taskManagersHandler = new TaskManagersHandler(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
@@ -288,7 +288,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			TaskManagersHeaders.getInstance(),
 			resourceManagerRetriever);
 
-		TaskManagerDetailsHandler<DispatcherGateway> taskManagerDetailsHandler = new TaskManagerDetailsHandler<>(
+		TaskManagerDetailsHandler taskManagerDetailsHandler = new TaskManagerDetailsHandler(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java
index ef95c61..f0a30ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java
@@ -38,16 +38,14 @@ import java.util.concurrent.CompletableFuture;
 
 /**
  * Handler which serves the cluster's configuration.
- *
- * @param <T> type of the leader gateway
  */
-public class ClusterConfigHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, ClusterConfigurationInfo, EmptyMessageParameters> {
+public class ClusterConfigHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, ClusterConfigurationInfo, EmptyMessageParameters> {
 
 	private final ClusterConfigurationInfo clusterConfig;
 
 	public ClusterConfigHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<? extends T> leaderRetriever,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> responseHeaders,
 			MessageHeaders<EmptyRequestBody, ClusterConfigurationInfo, EmptyMessageParameters> messageHeaders,
@@ -59,7 +57,7 @@ public class ClusterConfigHandler<T extends RestfulGateway> extends AbstractRest
 	}
 
 	@Override
-	protected CompletableFuture<ClusterConfigurationInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull T gateway) throws RestHandlerException {
+	protected CompletableFuture<ClusterConfigurationInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
 		return CompletableFuture.completedFuture(clusterConfig);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java
index 19b94c5..524c25b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java
@@ -37,10 +37,8 @@ import java.util.concurrent.CompletableFuture;
 
 /**
  * Handler which returns the cluster overview information with version.
- *
- * @param <T> type of the leader gateway
  */
-public class ClusterOverviewHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, ClusterOverviewWithVersion, EmptyMessageParameters> {
+public class ClusterOverviewHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, ClusterOverviewWithVersion, EmptyMessageParameters> {
 
 	private static final String version = EnvironmentInformation.getVersion();
 
@@ -48,7 +46,7 @@ public class ClusterOverviewHandler<T extends RestfulGateway> extends AbstractRe
 
 	public ClusterOverviewHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<T> leaderRetriever,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> responseHeaders,
 			MessageHeaders<EmptyRequestBody, ClusterOverviewWithVersion, EmptyMessageParameters> messageHeaders) {
@@ -56,7 +54,7 @@ public class ClusterOverviewHandler<T extends RestfulGateway> extends AbstractRe
 	}
 
 	@Override
-	public CompletableFuture<ClusterOverviewWithVersion> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull T gateway) {
+	public CompletableFuture<ClusterOverviewWithVersion> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) {
 		CompletableFuture<ClusterOverview> overviewFuture = gateway.requestClusterOverview(timeout);
 
 		return overviewFuture.thenApply(

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
index 785cd72..c67fc59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
@@ -36,16 +36,14 @@ import java.util.concurrent.CompletableFuture;
 
 /**
  * Handler which returns the dashboard configuration.
- *
- * @param <T> type of the leader gateway
  */
-public class DashboardConfigHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, DashboardConfiguration, EmptyMessageParameters> {
+public class DashboardConfigHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, DashboardConfiguration, EmptyMessageParameters> {
 
 	private final DashboardConfiguration dashboardConfiguration;
 
 	public DashboardConfigHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<? extends T> leaderRetriever,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> responseHeaders,
 			MessageHeaders<EmptyRequestBody, DashboardConfiguration, EmptyMessageParameters> messageHeaders,
@@ -56,7 +54,7 @@ public class DashboardConfigHandler<T extends RestfulGateway> extends AbstractRe
 	}
 
 	@Override
-	public CompletableFuture<DashboardConfiguration> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull T gateway) {
+	public CompletableFuture<DashboardConfiguration> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) {
 		return CompletableFuture.completedFuture(dashboardConfiguration);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
index 0fdf18c..0a510f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
@@ -37,15 +37,12 @@ import java.util.stream.Collectors;
 
 /**
  * Handler for job IDs.
- *
- * @param <T> type of the leader gateway
  */
-public class JobIdsHandler<T extends RestfulGateway>
-		extends AbstractRestHandler<T, EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> {
+public class JobIdsHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> {
 
 	public JobIdsHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<? extends T> leaderRetriever,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> responseHeaders,
 			MessageHeaders<EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> messageHeaders) {
@@ -60,7 +57,7 @@ public class JobIdsHandler<T extends RestfulGateway>
 	@Override
 	protected CompletableFuture<JobIdsWithStatusOverview> handleRequest(
 			@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
-			@Nonnull T gateway) throws RestHandlerException {
+			@Nonnull RestfulGateway gateway) throws RestHandlerException {
 
 		return gateway.requestJobDetails(timeout).thenApply(
 			multipleJobDetails -> new JobIdsWithStatusOverview(

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
index 46aa887..7b6be28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
@@ -36,14 +36,12 @@ import java.util.concurrent.CompletableFuture;
 
 /**
  * Overview handler for jobs.
- *
- * @param <T> type of the leader gateway
  */
-public class JobsOverviewHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
 
 	public JobsOverviewHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<? extends T> leaderRetriever,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> responseHeaders,
 			MessageHeaders<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> messageHeaders) {
@@ -56,7 +54,7 @@ public class JobsOverviewHandler<T extends RestfulGateway> extends AbstractRestH
 	}
 
 	@Override
-	protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull T gateway) throws RestHandlerException {
+	protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
 		return gateway.requestJobDetails(timeout);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
index 54ef081..ff5d1b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rest.handler.job.metrics;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nonnull;
@@ -61,13 +61,13 @@ import static java.util.Objects.requireNonNull;
  * @param <M> Type of the concrete {@link MessageParameters}
  */
 public abstract class AbstractMetricsHandler<M extends MessageParameters> extends
-	AbstractRestHandler<DispatcherGateway, EmptyRequestBody, MetricCollectionResponseBody, M> {
+	AbstractRestHandler<RestfulGateway, EmptyRequestBody, MetricCollectionResponseBody, M> {
 
 	private final MetricFetcher metricFetcher;
 
 	public AbstractMetricsHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<DispatcherGateway> leaderRetriever,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> headers,
 			MessageHeaders<EmptyRequestBody, MetricCollectionResponseBody, M> messageHeaders,
@@ -79,7 +79,7 @@ public abstract class AbstractMetricsHandler<M extends MessageParameters> extend
 	@Override
 	protected final CompletableFuture<MetricCollectionResponseBody> handleRequest(
 			@Nonnull HandlerRequest<EmptyRequestBody, M> request,
-			@Nonnull DispatcherGateway gateway) throws RestHandlerException {
+			@Nonnull RestfulGateway gateway) throws RestHandlerException {
 		metricFetcher.update();
 
 		final MetricStore.ComponentMetricStore componentMetricStore = getComponentMetricStore(

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandler.java
index 0d953c6..83d4b13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandler.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.rest.handler.job.metrics;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nullable;
@@ -40,7 +40,7 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler<JobManagerM
 
 	public JobManagerMetricsHandler(
 			final CompletableFuture<String> localRestAddress,
-			final GatewayRetriever<DispatcherGateway> leaderRetriever,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			final Time timeout,
 			final Map<String, String> headers,
 			final MetricFetcher metricFetcher) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandler.java
index 6a6c197..077d005 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rest.handler.job.metrics;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
@@ -27,6 +26,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nullable;
@@ -41,7 +41,7 @@ public class JobMetricsHandler extends AbstractMetricsHandler<JobMetricsMessageP
 
 	public JobMetricsHandler(
 			final CompletableFuture<String> localRestAddress,
-			final GatewayRetriever<DispatcherGateway> leaderRetriever,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			final Time timeout,
 			final Map<String, String> headers,
 			final MetricFetcher metricFetcher) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
index f8f4702..840ab02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest.handler.job.metrics;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
@@ -30,6 +29,7 @@ import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import java.util.Map;
@@ -46,7 +46,7 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler<JobVertexMet
 
 	public JobVertexMetricsHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<DispatcherGateway> leaderRetriever,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> headers,
 			MetricFetcher metricFetcher) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
index cb3d864..4c719c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest.handler.job.metrics;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nullable;
@@ -47,7 +47,7 @@ public class SubtaskMetricsHandler extends AbstractMetricsHandler<SubtaskMetrics
 
 	public SubtaskMetricsHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<DispatcherGateway> leaderRetriever,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> headers,
 			MetricFetcher metricFetcher) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java
index 9a284d7..d428ae7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest.handler.job.metrics;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nullable;
@@ -44,7 +44,7 @@ public class TaskManagerMetricsHandler extends AbstractMetricsHandler<TaskManage
 
 	public TaskManagerMetricsHandler(
 			final CompletableFuture<String> localRestAddress,
-			final GatewayRetriever<DispatcherGateway> leaderRetriever,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			final Time timeout,
 			final Map<String, String> headers,
 			final MetricFetcher metricFetcher) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
index e66a61a..9429a65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
@@ -45,17 +45,15 @@ import java.util.concurrent.CompletableFuture;
 
 /**
  * Handler which serves detailed TaskManager information.
- *
- * @param <T> type of the owning {@link RestfulGateway}
  */
-public class TaskManagerDetailsHandler<T extends RestfulGateway> extends AbstractTaskManagerHandler<T, EmptyRequestBody, TaskManagerDetailsInfo, TaskManagerMessageParameters> {
+public class TaskManagerDetailsHandler extends AbstractTaskManagerHandler<RestfulGateway, EmptyRequestBody, TaskManagerDetailsInfo, TaskManagerMessageParameters> {
 
 	private final MetricFetcher metricFetcher;
 	private final MetricStore metricStore;
 
 	public TaskManagerDetailsHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<? extends T> leaderRetriever,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> responseHeaders,
 			MessageHeaders<EmptyRequestBody, TaskManagerDetailsInfo, TaskManagerMessageParameters> messageHeaders,

http://git-wip-us.apache.org/repos/asf/flink/blob/b0bd65db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
index c8a5c8d..f34876e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
@@ -37,11 +37,11 @@ import java.util.concurrent.CompletableFuture;
 /**
  * Returns an overview over all registered TaskManagers of the cluster.
  */
-public class TaskManagersHandler<T extends RestfulGateway> extends AbstractTaskManagerHandler<T, EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
+public class TaskManagersHandler extends AbstractTaskManagerHandler<RestfulGateway, EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
 
 	public TaskManagersHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<T> leaderRetriever,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> responseHeaders,
 			MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> messageHeaders,


[2/3] flink git commit: [hotfix] Rename RestfulGateway#requestJobDetails into requestMultipleJobDetails to avoid name conflicts with JobMasterGateway

Posted by tr...@apache.org.
[hotfix] Rename RestfulGateway#requestJobDetails into requestMultipleJobDetails to avoid name conflicts with JobMasterGateway


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcbc9668
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcbc9668
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcbc9668

Branch: refs/heads/master
Commit: dcbc9668c7f07e6c60cc3032aeff37ab416a8204
Parents: b0bd65d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 11:08:40 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 30 17:57:32 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java  | 2 +-
 .../main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java  | 2 +-
 .../java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java  | 2 +-
 .../org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java   | 2 +-
 .../apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java | 2 +-
 .../flink/runtime/rest/handler/legacy/JobsOverviewHandler.java     | 2 +-
 .../flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java   | 2 +-
 .../java/org/apache/flink/runtime/webmonitor/RestfulGateway.java   | 2 +-
 .../runtime/rest/handler/legacy/metrics/MetricFetcherTest.java     | 2 +-
 9 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dcbc9668/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 0a465b9..37a27c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -223,7 +223,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	}
 
 	@Override
-	public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
+	public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
 		return FutureUtils.toJava(
 			jobManagerGateway
 				.ask(new RequestJobDetails(true, true), FutureUtils.toFiniteDuration(timeout))

http://git-wip-us.apache.org/repos/asf/flink/blob/dcbc9668/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 8e33421..b22e7ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -330,7 +330,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	@Override
-	public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
+	public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
 		final int numberJobsRunning = jobManagerRunners.size();
 
 		ArrayList<CompletableFuture<JobDetails>> individualJobDetails = new ArrayList<>(numberJobsRunning);

http://git-wip-us.apache.org/repos/asf/flink/blob/dcbc9668/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 2c7e438..f469993 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -42,8 +42,8 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

http://git-wip-us.apache.org/repos/asf/flink/blob/dcbc9668/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
index 0a510f4..cabeb40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
@@ -59,7 +59,7 @@ public class JobIdsHandler extends AbstractRestHandler<RestfulGateway, EmptyRequ
 			@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
 			@Nonnull RestfulGateway gateway) throws RestHandlerException {
 
-		return gateway.requestJobDetails(timeout).thenApply(
+		return gateway.requestMultipleJobDetails(timeout).thenApply(
 			multipleJobDetails -> new JobIdsWithStatusOverview(
 				multipleJobDetails
 					.getJobs()

http://git-wip-us.apache.org/repos/asf/flink/blob/dcbc9668/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
index 7b6be28..94bdbd2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
@@ -55,6 +55,6 @@ public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, Emp
 
 	@Override
 	protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
-		return gateway.requestJobDetails(timeout);
+		return gateway.requestMultipleJobDetails(timeout);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcbc9668/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
index d2b4a6c..bf8a013 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
@@ -68,7 +68,7 @@ public class JobsOverviewHandler extends AbstractJsonRequestHandler {
 	@Override
 	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
 		if (jobManagerGateway != null) {
-			CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(timeout);
+			CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestMultipleJobDetails(timeout);
 
 			return jobDetailsFuture.thenApplyAsync(
 				(MultipleJobsDetails result) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/dcbc9668/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index 313232f..44a6169 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -106,7 +106,7 @@ public class MetricFetcher<T extends RestfulGateway> {
 				/*
 				 * Remove all metrics that belong to a job that is not running and no longer archived.
 				 */
-				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestJobDetails(timeout);
+				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestMultipleJobDetails(timeout);
 
 				jobDetailsFuture.whenCompleteAsync(
 					(MultipleJobsDetails jobDetails, Throwable throwable) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/dcbc9668/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index d5de8e9..61dca3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -66,7 +66,7 @@ public interface RestfulGateway extends RpcGateway {
 	 * @param timeout for the asynchronous operation
 	 * @return Future containing the job details
 	 */
-	CompletableFuture<MultipleJobsDetails> requestJobDetails(
+	CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(
 		@RpcTimeout Time timeout);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/dcbc9668/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 72d18b0..9562b58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -81,7 +81,7 @@ public class MetricFetcherTest extends TestLogger {
 
 		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
 
-		when(jobManagerGateway.requestJobDetails(any(Time.class)))
+		when(jobManagerGateway.requestMultipleJobDetails(any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())));
 		when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
 			CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));


[3/3] flink git commit: [hotfix] Pass in Rest address to Dispatcher as nullable String

Posted by tr...@apache.org.
[hotfix] Pass in Rest address to Dispatcher as nullable String


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0e82dca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0e82dca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0e82dca

Branch: refs/heads/master
Commit: f0e82dca3b81f75718c8cdabbb586595170a4f1a
Parents: dcbc966
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 14:59:28 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 30 17:57:36 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java      | 18 +++++++++++-------
 .../runtime/dispatcher/StandaloneDispatcher.java  |  4 ++--
 .../entrypoint/SessionClusterEntrypoint.java      |  7 ++++---
 .../flink/runtime/dispatcher/DispatcherTest.java  |  3 +--
 4 files changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index b22e7ab..1fa0f7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -56,13 +56,14 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
@@ -93,7 +94,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final LeaderElectionService leaderElectionService;
 
-	private final CompletableFuture<String> restAddressFuture;
+	@Nullable
+	protected final String restAddress;
 
 	protected Dispatcher(
 			RpcService rpcService,
@@ -105,7 +107,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler,
-			Optional<String> restAddress) throws Exception {
+			@Nullable String restAddress) throws Exception {
 		super(rpcService, endpointId);
 
 		this.configuration = Preconditions.checkNotNull(configuration);
@@ -124,10 +126,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		jobManagerRunners = new HashMap<>(16);
 
 		leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
-		this.restAddressFuture = restAddress
-			.map(CompletableFuture::completedFuture)
-			.orElse(FutureUtils.completedExceptionally(new DispatcherException("The Dispatcher has not been started with a REST endpoint.")));
 
+		this.restAddress = restAddress;
 	}
 
 	//------------------------------------------------------
@@ -275,7 +275,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Override
 	public CompletableFuture<String> requestRestAddress(Time timeout) {
-		return restAddressFuture;
+		if (restAddress != null) {
+			return CompletableFuture.completedFuture(restAddress);
+		} else {
+			return FutureUtils.completedExceptionally(new DispatcherException("The Dispatcher has not been started with a REST endpoint."));
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 5a6889e..3ba681c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
-import java.util.Optional;
+import javax.annotation.Nullable;
 
 /**
  * Dispatcher implementation which spawns a {@link JobMaster} for each
@@ -51,7 +51,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler,
-			Optional<String> restAddress) throws Exception {
+			@Nullable String restAddress) throws Exception {
 		super(
 			rpcService,
 			endpointId,

http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 3feb005..27ddf49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -49,7 +49,8 @@ import org.apache.flink.util.FlinkException;
 
 import akka.actor.ActorSystem;
 
-import java.util.Optional;
+import javax.annotation.Nullable;
+
 import java.util.concurrent.Executor;
 
 /**
@@ -130,7 +131,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			heartbeatServices,
 			metricRegistry,
 			this,
-			Optional.of(dispatcherRestEndpoint.getRestAddress()));
+			dispatcherRestEndpoint.getRestAddress());
 
 		LOG.debug("Starting ResourceManager.");
 		resourceManager.start();
@@ -214,7 +215,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 		HeartbeatServices heartbeatServices,
 		MetricRegistry metricRegistry,
 		FatalErrorHandler fatalErrorHandler,
-		Optional<String> restAddress) throws Exception {
+		@Nullable String restAddress) throws Exception {
 
 		// create the default dispatcher
 		return new StandaloneDispatcher(

http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index a511d45..d5b63d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -51,7 +51,6 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -224,7 +223,7 @@ public class DispatcherTest extends TestLogger {
 				heartbeatServices,
 				metricRegistry,
 				fatalErrorHandler,
-				Optional.empty());
+				null);
 
 			this.jobManagerRunner = jobManagerRunner;
 			this.expectedJobId = expectedJobId;