You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/09 09:51:15 UTC

[3/4] flink git commit: [FLINK-8026] Let ClusterConfigHandler directly extend AbstractRestHandler

[FLINK-8026] Let ClusterConfigHandler directly extend AbstractRestHandler

This closes #4984.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/541fe436
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/541fe436
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/541fe436

Branch: refs/heads/master
Commit: 541fe43663d2a24b1ae66bc2b5228c49dfd43e7b
Parents: fa967df
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 10:56:33 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 9 10:48:39 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      | 11 +---
 .../handler/cluster/ClusterConfigHandler.java   | 65 ++++++++++++++++++++
 .../handler/legacy/ClusterConfigHandler.java    | 19 +-----
 3 files changed, 69 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/541fe436/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 96af2eb..3b262c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -24,9 +24,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
@@ -45,18 +45,15 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
-import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
@@ -163,15 +160,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			responseHeaders,
 			JobsOverviewHeaders.getInstance());
 
-		LegacyRestHandlerAdapter<DispatcherGateway, ClusterConfigurationInfo, EmptyMessageParameters> clusterConfigurationHandler = new LegacyRestHandlerAdapter<>(
+		ClusterConfigHandler<DispatcherGateway> clusterConfigurationHandler = new ClusterConfigHandler<>(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
 			responseHeaders,
 			ClusterConfigurationInfoHeaders.getInstance(),
-			new ClusterConfigHandler(
-				executor,
-				clusterConfiguration));
+			clusterConfiguration);
 
 		JobTerminationHandler jobTerminationHandler = new JobTerminationHandler(
 			restAddressFuture,

http://git-wip-us.apache.org/repos/asf/flink/blob/541fe436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java
new file mode 100644
index 0000000..ef95c61
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.cluster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler which serves the cluster's configuration.
+ *
+ * @param <T> type of the leader gateway
+ */
+public class ClusterConfigHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, ClusterConfigurationInfo, EmptyMessageParameters> {
+
+	private final ClusterConfigurationInfo clusterConfig;
+
+	public ClusterConfigHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends T> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, ClusterConfigurationInfo, EmptyMessageParameters> messageHeaders,
+			Configuration configuration) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+		Preconditions.checkNotNull(configuration);
+		this.clusterConfig = ClusterConfigurationInfo.from(configuration);
+	}
+
+	@Override
+	protected CompletableFuture<ClusterConfigurationInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull T gateway) throws RestHandlerException {
+		return CompletableFuture.completedFuture(clusterConfig);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/541fe436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
index efdc9c9..76221b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
@@ -19,15 +19,9 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
-import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoEntry;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -43,17 +37,14 @@ import java.util.concurrent.Executor;
 /**
  * Returns the Job Manager's configuration.
  */
-public class ClusterConfigHandler extends AbstractJsonRequestHandler
-		implements LegacyRestHandler<DispatcherGateway, ClusterConfigurationInfo, EmptyMessageParameters> {
+public class ClusterConfigHandler extends AbstractJsonRequestHandler {
 
-	private final ClusterConfigurationInfo clusterConfig;
 	private final String clusterConfigJson;
 
 	public ClusterConfigHandler(Executor executor, Configuration config) {
 		super(executor);
 
 		Preconditions.checkNotNull(config);
-		this.clusterConfig = ClusterConfigurationInfo.from(config);
 		this.clusterConfigJson = createConfigJson(config);
 	}
 
@@ -63,14 +54,6 @@ public class ClusterConfigHandler extends AbstractJsonRequestHandler
 	}
 
 	@Override
-	public CompletableFuture<ClusterConfigurationInfo> handleRequest(
-			HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
-			DispatcherGateway gateway) {
-
-		return CompletableFuture.completedFuture(clusterConfig);
-	}
-
-	@Override
 	public CompletableFuture<String> handleJsonRequest(
 			Map<String, String> pathParams,
 			Map<String, String> queryParams,