You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/07 14:22:51 UTC
flink git commit: [FLINK-4876] [webfrontend] Allow to bind to a
specific interface
Repository: flink
Updated Branches:
refs/heads/master a725910aa -> 718f6e4e3
[FLINK-4876] [webfrontend] Allow to bind to a specific interface
- Adds config key 'jobmanager.web.address' to configure listening address
- Default is Netty's default, picking anyLocalAddress()
This closes #2680.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/718f6e4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/718f6e4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/718f6e4e
Branch: refs/heads/master
Commit: 718f6e4e372affd60522cb26eb02c95b7637e65d
Parents: a725910
Author: Bram Vogelaar <at...@inuits.eu>
Authored: Fri Oct 21 15:04:25 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Nov 7 15:21:07 2016 +0100
----------------------------------------------------------------------
docs/setup/config.md | 2 ++
.../apache/flink/configuration/ConfigConstants.java | 11 +++++++++--
flink-dist/src/main/resources/flink-conf.yaml | 4 ++++
.../flink/runtime/webmonitor/WebMonitorConfig.java | 3 +++
.../flink/runtime/webmonitor/WebRuntimeMonitor.java | 16 ++++++++++++----
5 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 5791945..cc1f6bc 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -131,6 +131,8 @@ For Kafka and ZK, process-wide JAAS config will be created using the provided se
- `taskmanager.log.path`: The config parameter defining the taskmanager log file location
+- `jobmanager.web.address`: Address of the JobManager's web interface (DEFAULT: anyLocalAddress()).
+
- `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 8081).
- `jobmanager.web.tmpdir`: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface
http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b377e54..fb5a760 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -21,6 +21,8 @@ package org.apache.flink.configuration;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import static org.apache.flink.configuration.ConfigOptions.key;
+
/**
* This class contains all constants for the configuration. That includes the configuration keys and
* the default values.
@@ -546,7 +548,7 @@ public final class ConfigConstants {
// ------------------------- JobManager Web Frontend ----------------------
-
+
/**
* The port for the runtime monitor web-frontend server.
*/
@@ -1194,7 +1196,12 @@ public final class ConfigConstants {
// ------------------------- JobManager Web Frontend ----------------------
-
+
+ /** The config key for the address of the JobManager web frontend. */
+ public static final ConfigOption<String> DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS =
+ key("jobmanager.web.address")
+ .noDefaultValue();
+
/** The config key for the port of the JobManager web frontend.
* Setting this value to {@code -1} disables the web frontend. */
public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 58efe12..751acda 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -60,6 +60,10 @@ parallelism.default: 1
#==============================================================================
# Web Frontend
#==============================================================================
+
+# The address under which the web-based runtime monitor listens.
+#
+#jobmanager.web.address: 0.0.0.0
# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.
http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index dde6d0a..18fc5e8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -61,6 +61,9 @@ public class WebMonitorConfig {
this.config = config;
}
+ public String getWebFrontendAddress() {
+ return config.getValue(ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS);
+ }
public int getWebFrontendPort() {
return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 99e0894..479b4ac 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
@@ -159,7 +160,9 @@ public class WebRuntimeMonitor implements WebMonitor {
this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(config), timeout);
final WebMonitorConfig cfg = new WebMonitorConfig(config);
-
+
+ final String configuredAddress = cfg.getWebFrontendAddress();
+
final int configuredPort = cfg.getWebFrontendPort();
if (configuredPort < 0) {
throw new IllegalArgumentException("Web frontend port is invalid: " + configuredPort);
@@ -400,10 +403,15 @@ public class WebRuntimeMonitor implements WebMonitor {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
- Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
- this.serverChannel = ch;
+ ChannelFuture ch;
+ if (configuredAddress == null) {
+ ch = this.bootstrap.bind(configuredPort);
+ } else {
+ ch = this.bootstrap.bind(configuredAddress, configuredPort);
+ }
+ this.serverChannel = ch.sync().channel();
- InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
+ InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();