You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/18 15:10:21 UTC

[GitHub] zentol closed pull request #3391: [FLINK-5758] [yarn] support port range for web monitor

zentol closed pull request #3391: [FLINK-5758] [yarn] support port range for web monitor
URL: https://github.com/apache/flink/pull/3391
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/setup/config.md b/docs/setup/config.md
index c4618da9c7f..4d9a13f1ba6 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -347,7 +347,7 @@ These parameters allow for advanced tuning. The default values are sufficient wh
 
 ### JobManager Web Frontend
 
-- `jobmanager.web.port`: Port of the JobManager's web interface that displays status of running jobs and execution time breakdowns of finished jobs (DEFAULT: 8081). Setting this value to `-1` disables the web frontend.
+- `jobmanager.web.port`: Port of the JobManager's web interface that displays status of running jobs and execution time breakdowns of finished jobs (DEFAULT: 8081). Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. Setting this value to `-1` disables the web frontend.
 
 - `jobmanager.web.history`: The number of latest jobs that the JobManager's web front-end in its history (DEFAULT: 5).
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 296ddc96367..52a5388f8bf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -49,8 +49,8 @@ public void waitForClusterToBeReady() {}
 	@Override
 	public String getWebInterfaceURL() {
 		String host = this.getJobManagerAddress().getHostString();
-		int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
-			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+		int port = Integer.parseInt(getFlinkConfiguration().getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT));
 		return "http://" +  host + ":" + port;
 	}
 
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 0a0d06e086a..457f0b76643 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -165,6 +165,7 @@ under the License.
 							<exclude>org.apache.flink.configuration.ConfigConstants#ENABLE_QUARANTINE_MONITOR</exclude>
 							<exclude>org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_INITIAL_KEY</exclude>
 							<exclude>org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_MAX_KEY</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants#DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT</exclude>
 						</excludes>
 					</parameter>
 				</configuration>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5129f20eb2d..75ca44a7013 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -566,7 +566,12 @@
 	// ------------------------- JobManager Web Frontend ----------------------
 
 	/**
-	 * The port for the runtime monitor web-frontend server.
+	 * The config parameter defining the server port of the webmonitor.
+	 * The port can either be a port, such as "9123",
+	 * a range of ports: "50100-50200"
+	 * or a list of ranges and or points: "50100-50200,50300-50400,51234"
+	 *
+	 * Setting the port to 0 will let the OS choose an available port.
 	 */
 	public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";
 
@@ -1252,8 +1257,8 @@
 			.noDefaultValue();
 
 	/** The config key for the port of the JobManager web frontend.
-	 * Setting this value to {@code -1} disables the web frontend. */
-	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
+	 * Setting this value to {@code "-1"} disables the web frontend. */
+	public static final String DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = "8081";
 
 	/** Default value to override SSL support for the JobManager web UI */
 	public static final boolean DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED = true;
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index d4437e41fdd..6f06b762ee9 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -34,6 +34,7 @@
 import java.net.ServerSocket;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.net.BindException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -372,6 +373,46 @@ public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator
 		return null;
 	}
 
+	/**
+	 * Tries to create a server from the given sets of ports.
+	 *
+	 * @param address An address to listen at.
+	 * @param portRange A set of ports to choose from.
+	 * @param serverFactory A factory for creating server.
+	 * @return the created server.
+	 * @throws BindException If port range is exhausted.
+	 */
+	public static <T> T createServerFromPorts(String address, Iterator<Integer> portRange, ServerFactory<T> serverFactory) throws Exception {
+
+		while (portRange.hasNext()) {
+			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
+				portRange,
+				new NetUtils.SocketFactory() {
+					@Override
+					public ServerSocket createSocket(int port) throws IOException {
+						return new ServerSocket(port);
+					}
+				});
+
+			int port;
+			if (availableSocket == null) {
+				throw new BindException("Could not start server on any port in port range: " + portRange);
+			} else {
+				port = availableSocket.getLocalPort();
+				try {
+					availableSocket.close();
+				} catch (IOException ignored) {}
+			}
+
+			try {
+				return serverFactory.create(address, port);
+			} catch (BindException e) {}
+		}
+
+		throw new BindException("Could not start server on any port in port range: " + portRange);
+	}
+
+
 	/**
 	 * Returns the wildcard address to listen on all interfaces.
 	 * @return Either 0.0.0.0 or :: depending on the IP setup.
@@ -383,4 +424,8 @@ public static String getWildcardIPAddress() {
 	public interface SocketFactory {
 		ServerSocket createSocket(int port) throws IOException;
 	}
+
+	public interface ServerFactory<T> {
+		T create(String address, int port) throws Exception;
+	}
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 709ef094018..bfaf8a76c24 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1236,8 +1236,8 @@ public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration
 		checkNotNull(conf, "conf");
 
 		if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
-			int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
-			conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
+			String port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+			conf.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
 		}
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index a23c9f6767f..832ab18942b 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -310,7 +310,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
 			// 2: the web monitor
 			LOG.debug("Starting Web Frontend");
 
-			webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG);
+			webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, LOG);
 			if(webMonitor != null) {
 				final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
 				mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index 18fc5e825d8..03bf3f64938 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -21,6 +21,9 @@
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.NetUtils;
+
+import java.util.Iterator;
 
 public class WebMonitorConfig {
 
@@ -40,7 +43,7 @@
 	// ------------------------------------------------------------------------
 
 	/** Default port for the web dashboard (= 8081) */
-	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+	public static final String DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
 
 	/** Default refresh interval for the web dashboard (= 3000 msecs) */
 	public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 3000;
@@ -65,8 +68,14 @@ public String getWebFrontendAddress() {
 		return config.getValue(ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS);
 	}
 
-	public int getWebFrontendPort() {
-		return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+	public Iterator<Integer> getWebFrontendPortRange() throws IllegalArgumentException {
+		String serverPortRange = config.getString(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+
+		try {
+			return NetUtils.getPortRangeFromString(serverPortRange);
+		} catch (Exception e) {
+			throw new IllegalArgumentException("Invalid port range definition: " + serverPortRange);
+		}
 	}
 
 	public long getRefreshInterval() {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index a9cb630eabe..39a50348c95 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -83,6 +83,7 @@
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.NetUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,7 +97,9 @@
 import javax.net.ssl.SSLEngine;
 import java.io.File;
 import java.io.IOException;
+import java.net.BindException;
 import java.net.InetSocketAddress;
+import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
@@ -167,13 +170,6 @@ public WebRuntimeMonitor(
 		this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(config), timeout);
 		
 		final WebMonitorConfig cfg = new WebMonitorConfig(config);
-
-		final String configuredAddress = cfg.getWebFrontendAddress();
-
-		final int configuredPort = cfg.getWebFrontendPort();
-		if (configuredPort < 0) {
-			throw new IllegalArgumentException("Web frontend port is invalid: " + configuredPort);
-		}
 		
 		final WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(config);
 		
@@ -414,19 +410,36 @@ protected void initChannel(SocketChannel ch) {
 		NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
 		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
 
+
+		final String configuredAddress = cfg.getWebFrontendAddress();
+		final Iterator<Integer> configuredPortRange = cfg.getWebFrontendPortRange();
+
 		this.bootstrap = new ServerBootstrap();
 		this.bootstrap
 				.group(bossGroup, workerGroup)
 				.channel(NioServerSocketChannel.class)
 				.childHandler(initializer);
 
-		ChannelFuture ch;
-		if (configuredAddress == null) {
-			ch = this.bootstrap.bind(configuredPort);
-		} else {
-			ch = this.bootstrap.bind(configuredAddress, configuredPort);
+
+		try {
+			this.serverChannel = NetUtils.createServerFromPorts(configuredAddress, configuredPortRange, new NetUtils.ServerFactory<Channel>() {
+				@Override
+				public Channel create(String address, int port) throws Exception {
+					ChannelFuture ch;
+					if (address == null) {
+						ch = bootstrap.bind(port);
+					} else {
+						ch = bootstrap.bind(address, port);
+						LOG.info("Web frontend listening at configuredAddress " + address );
+					}
+
+					return ch.sync().channel();
+				}
+			});
+		} catch (Exception e) {
+			throw new BindException(e.getMessage());
 		}
-		this.serverChannel = ch.sync().channel();
+
 
 		InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
 		String address = bindAddress.getAddress().getHostAddress();
@@ -526,6 +539,7 @@ private void cleanup() {
 		}
 	}
 
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 8af1f46bc42..dbeb209db1c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -138,7 +138,7 @@ public void testRedirectToLeader() throws Exception {
 			Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
 			Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
-			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+			config.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "0");
 			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 
 			for (int i = 0; i < jobManagerSystem.length; i++) {
@@ -156,8 +156,8 @@ public void testRedirectToLeader() throws Exception {
 			String[] jobManagerAddress = new String[2];
 			for (int i = 0; i < jobManager.length; i++) {
 				Configuration jmConfig = config.clone();
-				jmConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
-						webMonitor[i].getServerPort());
+				jmConfig.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+						String.valueOf(webMonitor[i].getServerPort()));
 
 				jobManager[i] = JobManager.startJobManagerActors(
 					jmConfig,
@@ -280,7 +280,7 @@ public void testLeaderNotAvailable() throws Exception {
 			Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
 			final Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+			config.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "0");
 			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeper.getConnectString());
@@ -457,7 +457,7 @@ private WebRuntimeMonitor startWebRuntimeMonitor(
 
 		// Web frontend on random port
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+		config.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "0");
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 
 		WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index ebc9af86050..35a0e383904 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.clusterframework;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import com.typesafe.config.Config;
@@ -29,6 +28,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
@@ -45,7 +45,6 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.BindException;
-import java.net.ServerSocket;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -67,10 +66,10 @@
 	 * @throws Exception
 	 */
 	public static ActorSystem startActorSystem(
-				Configuration configuration,
+				final Configuration configuration,
 				String listeningAddress,
 				String portRangeDefinition,
-				Logger logger) throws Exception {
+				final Logger logger) throws Exception {
 
 		// parse port range definition and create port iterator
 		Iterator<Integer> portsIterator;
@@ -80,44 +79,23 @@ public static ActorSystem startActorSystem(
 			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
 		}
 
-		while (portsIterator.hasNext()) {
-			// first, we check if the port is available by opening a socket
-			// if the actor system fails to start on the port, we try further
-			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
-				portsIterator,
-				new NetUtils.SocketFactory() {
-					@Override
-					public ServerSocket createSocket(int port) throws IOException {
-						return new ServerSocket(port);
-					}
-				});
-
-			int port;
-			if (availableSocket == null) {
-				throw new BindException("Unable to allocate further port in port range: " + portRangeDefinition);
-			} else {
-				port = availableSocket.getLocalPort();
+		return NetUtils.createServerFromPorts(listeningAddress, portsIterator, new NetUtils.ServerFactory<ActorSystem>() {
+			@Override
+			public ActorSystem create(String address, int port) throws Exception {
 				try {
-					availableSocket.close();
-				} catch (IOException ignored) {}
-			}
-
-			try {
-				return startActorSystem(configuration, listeningAddress, port, logger);
-			}
-			catch (Exception e) {
-				// we can continue to try if this contains a netty channel exception
-				Throwable cause = e.getCause();
-				if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
-						cause instanceof java.net.BindException)) {
-					throw e;
-				} // else fall through the loop and try the next port
+					return startActorSystem(configuration, address, port, logger);
+				}
+				catch (Exception e) {
+					Throwable cause = e.getCause();
+					if (cause instanceof org.jboss.netty.channel.ChannelException ||
+						cause instanceof java.net.BindException) {
+						throw new BindException(e.getMessage());
+					} else {
+						throw e;
+					}
+				}
 			}
-		}
-
-		// if we come here, we have exhausted the port range
-		throw new BindException("Could not start actor system on any port in port range "
-			+ portRangeDefinition);
+		});
 	}
 
 	/**
@@ -174,36 +152,28 @@ public static ActorSystem startActorSystem(
 	public static WebMonitor startWebMonitorIfConfigured(
 				Configuration config,
 				ActorSystem actorSystem,
-				ActorRef jobManager,
 				Logger logger) throws Exception {
 
-
 		// this ensures correct values are present in the web frontend
 		final Address address = AkkaUtils.getAddress(actorSystem);
 		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host().get());
 		config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port().get().toString());
 
-		if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-			logger.info("Starting JobManager Web Frontend");
+		logger.info("Starting JobManager Web Frontend");
 
-			LeaderRetrievalService leaderRetrievalService = 
-				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
+		LeaderRetrievalService leaderRetrievalService =
+			LeaderRetrievalUtils.createLeaderRetrievalService(config);
 
-			// start the web frontend. we need to load this dynamically
-			// because it is not in the same project/dependencies
-			WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor(
-				config, leaderRetrievalService, actorSystem);
+		// start the web frontend. we need to load this dynamically
+		// because it is not in the same project/dependencies
+		WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor(
+			config, leaderRetrievalService, actorSystem);
 
-			// start the web monitor
-			if (monitor != null) {
-				String jobManagerAkkaURL = AkkaUtils.getAkkaURL(actorSystem, jobManager);
-				monitor.start(jobManagerAkkaURL);
-			}
-			return monitor;
-		}
-		else {
-			return null;
+		if (monitor != null) {
+			monitor.start(JobManager.getRemoteJobManagerAkkaURL(config));
 		}
+
+		return monitor;
 	}
 
 	/**
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8b0818195a9..8e20044f03a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -165,8 +165,8 @@ class JobManager(
    * web monitors can transparently interact with each job manager. Currently each web server has
    * to run in the actor system of the associated job manager.
    */
-  val webMonitorPort : Int = flinkConfiguration.getInteger(
-    ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
+  val webMonitorPort : Int = Integer.parseInt(flinkConfiguration.getString(
+    ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "-1"))
 
   /** The default directory for savepoints. */
   val defaultSavepointDir: String = ConfigurationUtil.getStringWithDeprecatedKeys(
@@ -2228,28 +2228,24 @@ object JobManager {
     configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get)
     configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get)
 
-    val webMonitor: Option[WebMonitor] =
-      if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-        LOG.info("Starting JobManager web frontend")
-        val leaderRetrievalService = LeaderRetrievalUtils
-          .createLeaderRetrievalService(configuration)
+    val webMonitor: Option[WebMonitor] = {
+      LOG.info("Starting JobManager web frontend")
+      val leaderRetrievalService = LeaderRetrievalUtils
+        .createLeaderRetrievalService(configuration)
 
-        // start the web frontend. we need to load this dynamically
-        // because it is not in the same project/dependencies
-        val webServer = WebMonitorUtils.startWebRuntimeMonitor(
-          configuration,
-          leaderRetrievalService,
-          jobManagerSystem)
+      // start the web frontend. we need to load this dynamically
+      // because it is not in the same project/dependencies
+      val webServer = WebMonitorUtils.startWebRuntimeMonitor(
+        configuration,
+        leaderRetrievalService,
+        jobManagerSystem)
 
-        Option(webServer)
-      }
-      else {
-        None
-      }
+      Option(webServer)
+    }
 
     // Reset the port (necessary in case of automatic port selection)
-    webMonitor.foreach{ monitor => configuration.setInteger(
-      ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, monitor.getServerPort) }
+    webMonitor.foreach{ monitor => configuration.setString(
+      ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, String.valueOf(monitor.getServerPort)) }
 
     try {
       // bring up the job manager actor
@@ -2402,7 +2398,9 @@ object JobManager {
     }
 
     if (cliOptions.getWebUIPort() >= 0) {
-      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, cliOptions.getWebUIPort())
+      configuration.setString(
+        ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+        String.valueOf(cliOptions.getWebUIPort()))
     }
 
     if (cliOptions.getHost() != null) {
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 07fb9969c3e..8f76de8c70d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -379,8 +379,7 @@ abstract class FlinkMiniCluster(
       jobManagerAkkaURL: String)
     : Option[WebMonitor] = {
     if(
-      config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
-        config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+      config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
 
       // TODO: Add support for HA: Make web server work independently from the JM
       val leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManagerAkkaURL)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 49c28e65d0f..4bfb4aa753f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -196,7 +196,7 @@ private static void printProcessLog(String log) {
 		public static void main(String[] args) {
 			try {
 				Configuration config = new Configuration();
-				config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
+				config.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "-1");
 
 				JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", 0);
 				System.exit(0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 42338cda445..b4a24a9a4a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -65,7 +65,7 @@ public static Configuration configureZooKeeperHA(
 		checkNotNull(fsStateHandlePath, "File state handle backend path");
 
 		// Web frontend, you have been dismissed. Sorry.
-		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
+		config.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "-1");
 
 		// ZooKeeper recovery mode
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index dab0a06fb93..deff0c0693d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1655,8 +1655,7 @@ public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configu
 		checkNotNull(conf, "conf");
 
 		if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
-			int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
-			conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
+			conf.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
 		}
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index cc7c0e29884..c8b54271166 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -147,7 +147,7 @@ public static LocalFlinkMiniCluster startCluster(
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
 		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
 
-		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
+		config.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "8081");
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 
 		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
index 4b24f424979..72cfc067abe 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -189,9 +189,9 @@ private static Configuration createConfiguration(String baseDirectory, Map<Strin
 			configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
 		}
 
-		// if a web monitor shall be started, set the port to random binding
-		if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-			configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+		// if the user doesn't specify port, set the port to random binding
+		if (!configuration.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+			configuration.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "0");
 		}
 
 		// if the user has set the deprecated YARN-specific config keys, we add the 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 492fc0bf086..763aa613217 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -326,13 +326,30 @@ protected int runApplicationMaster(Configuration config) {
 
 			// ---- (4) start the actors and components in this order:
 
-			// 1) JobManager & Archive (in non-HA case, the leader service takes this)
-			// 2) Web Monitor (we need its port to register)
+			// 1) Web Monitor (we need its port to register)
+			// 2) JobManager & Archive (in non-HA case, the leader service takes this)
 			// 3) Resource Master for YARN
 			// 4) Process reapers for the JobManager and Resource Master
 
+			// 1: the web monitor
+			LOG.debug("Starting Web Frontend");
+
+			webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, LOG);
+
+			String protocol = "http://";
+			if (config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
+				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
+				protocol = "https://";
+			}
+			final String webMonitorURL = webMonitor == null ? null :
+				protocol + appMasterHostname + ":" + webMonitor.getServerPort();
+
+			if (webMonitor != null) {
+				config.setString(
+					ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, String.valueOf(webMonitor.getServerPort()));
+			}
 
-			// 1: the JobManager
+			// 2: the JobManager
 			LOG.debug("Starting JobManager actor");
 
 			// we start the JobManager with its standard name
@@ -346,20 +363,6 @@ protected int runApplicationMaster(Configuration config) {
 				getJobManagerClass(),
 				getArchivistClass())._1();
 
-
-			// 2: the web monitor
-			LOG.debug("Starting Web Frontend");
-
-			webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG);
-
-			String protocol = "http://";
-			if (config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
-				protocol = "https://";
-			}
-			final String webMonitorURL = webMonitor == null ? null :
-				protocol + appMasterHostname + ":" + webMonitor.getServerPort();
-
 			// 3: Flink's Yarn ResourceManager
 			LOG.debug("Starting YARN Flink Resource Manager");
 
@@ -495,9 +498,9 @@ private static Configuration createConfiguration(String baseDirectory, Map<Strin
 			configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
 		}
 
-		// if a web monitor shall be started, set the port to random binding
-		if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-			configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+		// if the user doesn't specify port, set the port to random binding
+		if (!configuration.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+			configuration.setString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "0");
 		}
 
 		// if the user has set the deprecated YARN-specific config keys, we add the 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services