You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/03 14:54:54 UTC
[flink] branch release-1.10 updated: [FLINK-18772] Disable web
submission for per-job/application mode deployments
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new eb0a56b [FLINK-18772] Disable web submission for per-job/application mode deployments
eb0a56b is described below
commit eb0a56bafa241c476d1b69a7ae0e1dc269305c4d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jul 30 14:14:24 2020 +0200
[FLINK-18772] Disable web submission for per-job/application mode deployments
When running Flink in per-job/application mode, it will instantiate a MiniDispatcherRestEndpoint.
This endpoint does not instantiate the web submission REST handlers. However, it still displayed
the submit job link in the web ui. This commit changes the behaviour so that we no longer display
this link when running Flink in per-job/application mode.
This closes #13030.
---
.../runtime/dispatcher/DispatcherRestEndpoint.java | 18 +++++++++++++-----
.../flink/runtime/webmonitor/WebMonitorEndpoint.java | 11 ++++++++++-
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 870cb5a..c48ac7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -41,6 +41,8 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
@@ -96,8 +98,17 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
executor,
clusterConfiguration);
+ handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
+
+ return handlers;
+ }
+
+ @Override
+ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeWebSubmissionHandlers(CompletableFuture<String> localAddressFuture) {
if (restConfiguration.isWebSubmitEnabled()) {
try {
+ final Time timeout = restConfiguration.getTimeout();
+
webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
leaderRetriever,
timeout,
@@ -107,8 +118,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
executor,
clusterConfiguration);
- // register extension handlers
- handlers.addAll(webSubmissionExtension.getHandlers());
+ return webSubmissionExtension.getHandlers();
} catch (FlinkException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to load web based job submission extension.", e);
@@ -121,9 +131,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
log.info("Web-based job submission is not enabled.");
}
- handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
-
- return handlers;
+ return Collections.emptyList();
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index ad98cc8..e00364e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -133,6 +133,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@@ -207,6 +208,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);
+ final Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> webSubmissionHandlers = initializeWebSubmissionHandlers(localAddressFuture);
+ handlers.addAll(webSubmissionHandlers);
+ final boolean hasWebSubmissionHandlers = !webSubmissionHandlers.isEmpty();
+
final Time timeout = restConfiguration.getTimeout();
ClusterOverviewHandler clusterOverviewHandler = new ClusterOverviewHandler(
@@ -221,7 +226,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
responseHeaders,
DashboardConfigurationHeaders.getInstance(),
restConfiguration.getRefreshInterval(),
- restConfiguration.isWebSubmitEnabled());
+ hasWebSubmissionHandlers);
JobIdsHandler jobIdsHandler = new JobIdsHandler(
leaderRetriever,
@@ -664,6 +669,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
return handlers;
}
+ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeWebSubmissionHandlers(final CompletableFuture<String> localAddressFuture) {
+ return Collections.emptyList();
+ }
+
@Nonnull
private ChannelInboundHandler createStaticFileHandler(
Time timeout,