You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/09 09:51:14 UTC
[2/4] flink git commit: [FLINK-8024] Let ClusterOverviewHandler
directly extend from AbstractRestHandler
[FLINK-8024] Let ClusterOverviewHandler directly extend from AbstractRestHandler
This closes #4982.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f03393e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f03393e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f03393e8
Branch: refs/heads/master
Commit: f03393e807f51e6496f5bd54771987a64287b154
Parents: 34fdf56
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 6 18:48:53 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 9 10:48:39 2017 +0100
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 10 +--
.../handler/cluster/ClusterOverviewHandler.java | 65 ++++++++++++++++++++
.../handler/legacy/ClusterOverviewHandler.java | 15 +----
3 files changed, 69 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f03393e8/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index a479749..1a0c584 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
@@ -44,12 +45,10 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCach
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
@@ -143,15 +142,12 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
final Time timeout = restConfiguration.getTimeout();
final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders();
- LegacyRestHandlerAdapter<DispatcherGateway, ClusterOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new LegacyRestHandlerAdapter<>(
+ ClusterOverviewHandler<DispatcherGateway> clusterOverviewHandler = new ClusterOverviewHandler<>(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
- ClusterOverviewHeaders.getInstance(),
- new ClusterOverviewHandler(
- executor,
- timeout));
+ ClusterOverviewHeaders.getInstance());
LegacyRestHandlerAdapter<DispatcherGateway, DashboardConfiguration, EmptyMessageParameters> dashboardConfigurationHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture,
http://git-wip-us.apache.org/repos/asf/flink/blob/f03393e8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java
new file mode 100644
index 0000000..19b94c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.cluster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler which returns the cluster overview information with version.
+ *
+ * @param <T> type of the leader gateway
+ */
+public class ClusterOverviewHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, ClusterOverviewWithVersion, EmptyMessageParameters> {
+
+ private static final String version = EnvironmentInformation.getVersion();
+
+ private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;
+
+ public ClusterOverviewHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<T> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, ClusterOverviewWithVersion, EmptyMessageParameters> messageHeaders) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+ }
+
+ @Override
+ public CompletableFuture<ClusterOverviewWithVersion> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull T gateway) {
+ CompletableFuture<ClusterOverview> overviewFuture = gateway.requestClusterOverview(timeout);
+
+ return overviewFuture.thenApply(
+ statusOverview -> ClusterOverviewWithVersion.fromStatusOverview(statusOverview, version, commitID));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f03393e8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index 30e7fec..0a26084 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -20,16 +20,11 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.FlinkException;
@@ -48,7 +43,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Responder that returns the status of the Flink cluster, such as how many
* TaskManagers are currently connected, and how many jobs are running.
*/
-public class ClusterOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, ClusterOverviewWithVersion, EmptyMessageParameters> {
+public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
private static final String version = EnvironmentInformation.getVersion();
@@ -108,12 +103,4 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler implement
return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e));
}
}
-
- @Override
- public CompletableFuture<ClusterOverviewWithVersion> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
- CompletableFuture<ClusterOverview> overviewFuture = gateway.requestClusterOverview(timeout);
-
- return overviewFuture.thenApply(
- statusOverview -> ClusterOverviewWithVersion.fromStatusOverview(statusOverview, version, commitID));
- }
}