You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/07 14:22:51 UTC

flink git commit: [FLINK-4876] [webfrontend] Allow to bind to a specific interface

Repository: flink
Updated Branches:
  refs/heads/master a725910aa -> 718f6e4e3


[FLINK-4876] [webfrontend] Allow to bind to a specific interface

- Adds config key 'jobmanager.web.address' to configure listening address
- Default is Netty's default, picking anyLocalAddress()

This closes #2680.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/718f6e4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/718f6e4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/718f6e4e

Branch: refs/heads/master
Commit: 718f6e4e372affd60522cb26eb02c95b7637e65d
Parents: a725910
Author: Bram Vogelaar <at...@inuits.eu>
Authored: Fri Oct 21 15:04:25 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Nov 7 15:21:07 2016 +0100

----------------------------------------------------------------------
 docs/setup/config.md                                |  2 ++
 .../apache/flink/configuration/ConfigConstants.java | 11 +++++++++--
 flink-dist/src/main/resources/flink-conf.yaml       |  4 ++++
 .../flink/runtime/webmonitor/WebMonitorConfig.java  |  3 +++
 .../flink/runtime/webmonitor/WebRuntimeMonitor.java | 16 ++++++++++++----
 5 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 5791945..cc1f6bc 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -131,6 +131,8 @@ For Kafka and ZK, process-wide JAAS config will be created using the provided se
 
 - `taskmanager.log.path`: The config parameter defining the taskmanager log file location
 
+- `jobmanager.web.address`: Address of the JobManager's web interface (DEFAULT: anyLocalAddress()).
+
 - `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 8081).
 
 - `jobmanager.web.tmpdir`: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface

http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b377e54..fb5a760 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -21,6 +21,8 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 /**
  * This class contains all constants for the configuration. That includes the configuration keys and
  * the default values.
@@ -546,7 +548,7 @@ public final class ConfigConstants {
 	
 	
 	// ------------------------- JobManager Web Frontend ----------------------
-	
+
 	/**
 	 * The port for the runtime monitor web-frontend server.
 	 */
@@ -1194,7 +1196,12 @@ public final class ConfigConstants {
 	
 	
 	// ------------------------- JobManager Web Frontend ----------------------
-	
+
+	/** The config key for the address of the JobManager web frontend. */
+	public static final ConfigOption<String> DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS =
+		key("jobmanager.web.address")
+			.noDefaultValue();
+
 	/** The config key for the port of the JobManager web frontend.
 	 * Setting this value to {@code -1} disables the web frontend. */
 	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;

http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 58efe12..751acda 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -60,6 +60,10 @@ parallelism.default: 1
 #==============================================================================
 # Web Frontend
 #==============================================================================
+ 
+# The address under which the web-based runtime monitor listens.
+#
+#jobmanager.web.address: 0.0.0.0
 
 # The port under which the web-based runtime monitor listens.
 # A value of -1 deactivates the web server.

http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index dde6d0a..18fc5e8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -61,6 +61,9 @@ public class WebMonitorConfig {
 		this.config = config;
 	}
 
+	public String getWebFrontendAddress() {
+		return config.getValue(ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS);
+	}
 
 	public int getWebFrontendPort() {
 		return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);

http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 99e0894..479b4ac 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor;
 import akka.actor.ActorSystem;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
@@ -159,7 +160,9 @@ public class WebRuntimeMonitor implements WebMonitor {
 		this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(config), timeout);
 		
 		final WebMonitorConfig cfg = new WebMonitorConfig(config);
-		
+
+		final String configuredAddress = cfg.getWebFrontendAddress();
+
 		final int configuredPort = cfg.getWebFrontendPort();
 		if (configuredPort < 0) {
 			throw new IllegalArgumentException("Web frontend port is invalid: " + configuredPort);
@@ -400,10 +403,15 @@ public class WebRuntimeMonitor implements WebMonitor {
 				.channel(NioServerSocketChannel.class)
 				.childHandler(initializer);
 
-		Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
-		this.serverChannel = ch;
+		ChannelFuture ch;
+		if (configuredAddress == null) {
+			ch = this.bootstrap.bind(configuredPort);
+		} else {
+			ch = this.bootstrap.bind(configuredAddress, configuredPort);
+		}
+		this.serverChannel = ch.sync().channel();
 
-		InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
+		InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
 		String address = bindAddress.getAddress().getHostAddress();
 		int port = bindAddress.getPort();