You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/03 14:54:54 UTC

[flink] branch release-1.10 updated: [FLINK-18772] Disable web submission for per-job/application mode deployments

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new eb0a56b  [FLINK-18772] Disable web submission for per-job/application mode deployments
eb0a56b is described below

commit eb0a56bafa241c476d1b69a7ae0e1dc269305c4d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jul 30 14:14:24 2020 +0200

    [FLINK-18772] Disable web submission for per-job/application mode deployments
    
    When running Flink in per-job/application mode, it will instantiate a MiniDispatcherRestEndpoint.
    This endpoint does not instantiate the web submission REST handlers. However, it still displayed
    the submit job link in the web ui. This commit changes the behaviour so that we no longer display
    this link when running Flink in per-job/application mode.
    
    This closes #13030.
---
 .../runtime/dispatcher/DispatcherRestEndpoint.java     | 18 +++++++++++++-----
 .../flink/runtime/webmonitor/WebMonitorEndpoint.java   | 11 ++++++++++-
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 870cb5a..c48ac7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -41,6 +41,8 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
@@ -96,8 +98,17 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			executor,
 			clusterConfiguration);
 
+		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
+
+		return handlers;
+	}
+
+	@Override
+	protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeWebSubmissionHandlers(CompletableFuture<String> localAddressFuture) {
 		if (restConfiguration.isWebSubmitEnabled()) {
 			try {
+				final Time timeout = restConfiguration.getTimeout();
+
 				webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
 					leaderRetriever,
 					timeout,
@@ -107,8 +118,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 					executor,
 					clusterConfiguration);
 
-				// register extension handlers
-				handlers.addAll(webSubmissionExtension.getHandlers());
+				return webSubmissionExtension.getHandlers();
 			} catch (FlinkException e) {
 				if (log.isDebugEnabled()) {
 					log.debug("Failed to load web based job submission extension.", e);
@@ -121,9 +131,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			log.info("Web-based job submission is not enabled.");
 		}
 
-		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
-
-		return handlers;
+		return Collections.emptyList();
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index ad98cc8..e00364e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -133,6 +133,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
@@ -207,6 +208,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 	protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
 		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);
 
+		final Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> webSubmissionHandlers = initializeWebSubmissionHandlers(localAddressFuture);
+		handlers.addAll(webSubmissionHandlers);
+		final boolean hasWebSubmissionHandlers = !webSubmissionHandlers.isEmpty();
+
 		final Time timeout = restConfiguration.getTimeout();
 
 		ClusterOverviewHandler clusterOverviewHandler = new ClusterOverviewHandler(
@@ -221,7 +226,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			responseHeaders,
 			DashboardConfigurationHeaders.getInstance(),
 			restConfiguration.getRefreshInterval(),
-			restConfiguration.isWebSubmitEnabled());
+			hasWebSubmissionHandlers);
 
 		JobIdsHandler jobIdsHandler = new JobIdsHandler(
 			leaderRetriever,
@@ -664,6 +669,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		return handlers;
 	}
 
+	protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeWebSubmissionHandlers(final CompletableFuture<String> localAddressFuture) {
+		return Collections.emptyList();
+	}
+
 	@Nonnull
 	private ChannelInboundHandler createStaticFileHandler(
 			Time timeout,