You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by barcahead <gi...@git.apache.org> on 2017/02/22 14:40:01 UTC

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

GitHub user barcahead opened a pull request:

    https://github.com/apache/flink/pull/3391

    [FLINK-5758] [yarn] support port range for web monitor

    1. `jobmanager.web.port` now supports port range and has a string type default value - "8081"
    2. a helper method `createServerFromPorts` is defined to select port and create corresponding server, also refactor method `BootstrapTools. startActorSystem` using this helper method
    3.  In `YarnApplicationMasterRunner.runApplicationMaster`, first create web monitor to have a fixed web port, store it in config and then create jobmanager. So jobmanager knows the web port, this port info is useful when new jobmanager is elected. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/barcahead/flink FLINK-5758

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3391.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3391
    
----
commit af5dcfa9cdde0d6d31a272be967a5e5547f8564a
Author: fengyelei <fe...@huawei.com>
Date:   2017-02-22T14:23:06Z

    [FLINK-5758] [yarn] support port range for web monitor

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3391: [FLINK-5758] [yarn] support port range for web monitor

Posted by barcahead <gi...@git.apache.org>.
Github user barcahead commented on the issue:

    https://github.com/apache/flink/pull/3391
  
    @StephanEwen I will update the PR later


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103951515
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java ---
    @@ -65,8 +68,14 @@ public String getWebFrontendAddress() {
     		return config.getValue(ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS);
     	}
     
    -	public int getWebFrontendPort() {
    -		return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
    +	public Iterator<Integer> getWebFrontendPortRange() throws IllegalArgumentException {
    +		String serverPortRange = config.getString(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
    +
    +		try {
    +			return NetUtils.getPortRangeFromString(serverPortRange);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + serverPortRange);
    --- End diff --
    
    And add the cause `e` for this exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103205236
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---
    @@ -379,8 +379,7 @@ abstract class FlinkMiniCluster(
           jobManagerAkkaURL: String)
         : Option[WebMonitor] = {
         if(
    -      config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
    -        config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
    +      config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
    --- End diff --
    
    Why did you remove the other condition? What if the port has been set to `-1`, then the documentation says that no web server will be started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103956163
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
    @@ -1252,8 +1257,8 @@
     			.noDefaultValue();
     
     	/** The config key for the port of the JobManager web frontend.
    -	 * Setting this value to {@code -1} disables the web frontend. */
    -	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
    +	 * Setting this value to {@code "-1"} disables the web frontend. */
    +	public static final String DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = "8081";
    --- End diff --
    
    To be honest, I'm not sure whether we can make this change, because `DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT` is part of Flink's public API and, thus, your change is API breaking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103201834
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/NetUtils.java ---
    @@ -373,6 +374,46 @@ public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator
     	}
     
     	/**
    +	 * Tries to create a server from the given sets of ports.
    --- End diff --
    
    typo: "set"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103202170
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/NetUtils.java ---
    @@ -373,6 +374,46 @@ public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator
     	}
     
     	/**
    +	 * Tries to create a server from the given sets of ports.
    +	 *
    +	 * @param address An address to listen at.
    +	 * @param portRange A set of ports to choose from.
    +	 * @param serverFactory A factory for creating server.
    +	 * @return the created server.
    +	 * @throws BindException If port range is exhausted.
    +	 */
    +	public static <T> T createServerFromPorts(String address, Iterator<Integer> portRange, ServerFactory<T> serverFactory) throws Exception {
    +
    +		while (portRange.hasNext()) {
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    +				portRange,
    +				new NetUtils.SocketFactory() {
    +					@Override
    +					public ServerSocket createSocket(int port) throws IOException {
    +						return new ServerSocket(port);
    +					}
    +				});
    +
    +			int port;
    +			if (availableSocket == null) {
    +				throw new BindException("Could not start server on any port in port range: " + portRange);
    --- End diff --
    
    I guess we only should throw this one if our `iterator` is exhausted, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103202459
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/NetUtils.java ---
    @@ -373,6 +374,46 @@ public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator
     	}
     
     	/**
    +	 * Tries to create a server from the given sets of ports.
    +	 *
    +	 * @param address An address to listen at.
    +	 * @param portRange A set of ports to choose from.
    +	 * @param serverFactory A factory for creating server.
    +	 * @return the created server.
    +	 * @throws BindException If port range is exhausted.
    +	 */
    +	public static <T> T createServerFromPorts(String address, Iterator<Integer> portRange, ServerFactory<T> serverFactory) throws Exception {
    +
    +		while (portRange.hasNext()) {
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    +				portRange,
    +				new NetUtils.SocketFactory() {
    +					@Override
    +					public ServerSocket createSocket(int port) throws IOException {
    +						return new ServerSocket(port);
    +					}
    +				});
    +
    +			int port;
    +			if (availableSocket == null) {
    +				throw new BindException("Could not start server on any port in port range: " + portRange);
    +			} else {
    +				port = availableSocket.getLocalPort();
    +				try {
    +					availableSocket.close();
    +				} catch (IOException ignored) {}
    +			}
    +
    +			try {
    +				return serverFactory.create(address, port);
    +			} catch (BindException e) {}
    --- End diff --
    
    What if the creation of the server fails for some other reason? Shouldn't we retry the other ports as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103980334
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
    @@ -1252,8 +1257,8 @@
     			.noDefaultValue();
     
     	/** The config key for the port of the JobManager web frontend.
    -	 * Setting this value to {@code -1} disables the web frontend. */
    -	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
    +	 * Setting this value to {@code "-1"} disables the web frontend. */
    +	public static final String DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = "8081";
    --- End diff --
    
    The right way to do it, is to leave this variable unchanged, mark it deprecated and introduce a new default value for the job manager web frontend port which is of type string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3391: [FLINK-5758] [yarn] support port range for web monitor

Posted by barcahead <gi...@git.apache.org>.
Github user barcahead commented on the issue:

    https://github.com/apache/flink/pull/3391
  
    @StephanEwen could you help review this pr please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r104122545
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---
    @@ -174,36 +152,28 @@ public static ActorSystem startActorSystem(
     	public static WebMonitor startWebMonitorIfConfigured(
     				Configuration config,
     				ActorSystem actorSystem,
    -				ActorRef jobManager,
     				Logger logger) throws Exception {
     
    -
     		// this ensures correct values are present in the web frontend
     		final Address address = AkkaUtils.getAddress(actorSystem);
     		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host().get());
     		config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port().get().toString());
     
    -		if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
    -			logger.info("Starting JobManager Web Frontend");
    +		logger.info("Starting JobManager Web Frontend");
     
    -			LeaderRetrievalService leaderRetrievalService = 
    -				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
    +		LeaderRetrievalService leaderRetrievalService =
    +			LeaderRetrievalUtils.createLeaderRetrievalService(config);
     
    -			// start the web frontend. we need to load this dynamically
    -			// because it is not in the same project/dependencies
    -			WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor(
    -				config, leaderRetrievalService, actorSystem);
    +		// start the web frontend. we need to load this dynamically
    +		// because it is not in the same project/dependencies
    +		WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor(
    +			config, leaderRetrievalService, actorSystem);
     
    -			// start the web monitor
    -			if (monitor != null) {
    -				String jobManagerAkkaURL = AkkaUtils.getAkkaURL(actorSystem, jobManager);
    -				monitor.start(jobManagerAkkaURL);
    -			}
    -			return monitor;
    -		}
    -		else {
    -			return null;
    +		if (monitor != null) {
    +			monitor.start(JobManager.getRemoteJobManagerAkkaURL(config));
    --- End diff --
    
    I think it is not a good idea to bake in the constraint in this method that the `JobManager` always have to be started under the same actor name. This is brittle and I think it would be better to pass in the JobManager's actor name to the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103954040
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/NetUtils.java ---
    @@ -373,6 +374,46 @@ public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator
     	}
     
     	/**
    +	 * Tries to create a server from the given sets of ports.
    +	 *
    +	 * @param address An address to listen at.
    +	 * @param portRange A set of ports to choose from.
    +	 * @param serverFactory A factory for creating server.
    +	 * @return the created server.
    +	 * @throws BindException If port range is exhausted.
    +	 */
    +	public static <T> T createServerFromPorts(String address, Iterator<Integer> portRange, ServerFactory<T> serverFactory) throws Exception {
    --- End diff --
    
    I think we should add some tests for this function. Especially to properly test how it behaves in case of `Exceptions`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3391: [FLINK-5758] [yarn] support port range for web monitor

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3391
  
    I'm currently reviewing it @StephanEwen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103202714
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java ---
    @@ -65,8 +68,14 @@ public String getWebFrontendAddress() {
     		return config.getValue(ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS);
     	}
     
    -	public int getWebFrontendPort() {
    -		return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
    +	public Iterator<Integer> getWebFrontendPortRange() throws IllegalArgumentException {
    +		String serverPortRange = config.getString(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
    +
    +		try {
    +			return NetUtils.getPortRangeFromString(serverPortRange);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + serverPortRange);
    --- End diff --
    
    Maybe we could add "Invalid port range definition for the job manager's web monitor".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103204548
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---
    @@ -414,19 +410,36 @@ protected void initChannel(SocketChannel ch) {
     		NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
     		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
     
    +
    +		final String configuredAddress = cfg.getWebFrontendAddress();
    +		final Iterator<Integer> configuredPortRange = cfg.getWebFrontendPortRange();
    +
     		this.bootstrap = new ServerBootstrap();
     		this.bootstrap
     				.group(bossGroup, workerGroup)
     				.channel(NioServerSocketChannel.class)
     				.childHandler(initializer);
     
    -		ChannelFuture ch;
    -		if (configuredAddress == null) {
    -			ch = this.bootstrap.bind(configuredPort);
    -		} else {
    -			ch = this.bootstrap.bind(configuredAddress, configuredPort);
    +
    +		try {
    +			this.serverChannel = NetUtils.createServerFromPorts(configuredAddress, configuredPortRange, new NetUtils.ServerFactory<Channel>() {
    +				@Override
    +				public Channel create(String address, int port) throws Exception {
    +					ChannelFuture ch;
    +					if (address == null) {
    +						ch = bootstrap.bind(port);
    +					} else {
    +						ch = bootstrap.bind(address, port);
    +						LOG.info("Web frontend listening at configuredAddress " + address );
    --- End diff --
    
    Maybe it's better to log the full address. Moreover, you should use the placeholder syntax. And maybe it makes also sense to log it for the case where `address==null`. In that case `InetAddress.anyLocalAddress` is used. Maybe we could make this explicit and then log it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3391: [FLINK-5758] [yarn] support port range for web monitor

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3391
  
    @barcahead  Thansk for contributing this.
    I can try and get to this later this week. Big pull request backlog right now ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103203014
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---
    @@ -414,19 +410,36 @@ protected void initChannel(SocketChannel ch) {
     		NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
     		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
     
    +
    +		final String configuredAddress = cfg.getWebFrontendAddress();
    +		final Iterator<Integer> configuredPortRange = cfg.getWebFrontendPortRange();
    +
     		this.bootstrap = new ServerBootstrap();
     		this.bootstrap
     				.group(bossGroup, workerGroup)
     				.channel(NioServerSocketChannel.class)
     				.childHandler(initializer);
     
    -		ChannelFuture ch;
    -		if (configuredAddress == null) {
    -			ch = this.bootstrap.bind(configuredPort);
    -		} else {
    -			ch = this.bootstrap.bind(configuredAddress, configuredPort);
    +
    +		try {
    +			this.serverChannel = NetUtils.createServerFromPorts(configuredAddress, configuredPortRange, new NetUtils.ServerFactory<Channel>() {
    +				@Override
    +				public Channel create(String address, int port) throws Exception {
    +					ChannelFuture ch;
    +					if (address == null) {
    +						ch = bootstrap.bind(port);
    +					} else {
    +						ch = bootstrap.bind(address, port);
    +						LOG.info("Web frontend listening at configuredAddress " + address );
    +					}
    +
    +					return ch.sync().channel();
    +				}
    +			});
    +		} catch (Exception e) {
    +			throw new BindException(e.getMessage());
    --- End diff --
    
    Only forwarding exception messages is not good. Please preserve the cause of the failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3391: [FLINK-5758] [yarn] support port range for web monitor

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3391
  
    @barcahead what is the status of this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3391: [FLINK-5758] [yarn] support port range for web monitor

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3391
  
    @barcahead Please let us know if you want to follow up on this PR...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103980465
  
    --- Diff: flink-core/pom.xml ---
    @@ -165,6 +165,7 @@ under the License.
     							<exclude>org.apache.flink.configuration.ConfigConstants#ENABLE_QUARANTINE_MONITOR</exclude>
     							<exclude>org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_INITIAL_KEY</exclude>
     							<exclude>org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_MAX_KEY</exclude>
    +							<exclude>org.apache.flink.configuration.ConfigConstants#DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT</exclude>
    --- End diff --
    
    Introducing a new variable will also help us to not add this exclude statement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r104121909
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---
    @@ -326,13 +326,30 @@ protected int runApplicationMaster(Configuration config) {
     
     			// ---- (4) start the actors and components in this order:
     
    -			// 1) JobManager & Archive (in non-HA case, the leader service takes this)
    -			// 2) Web Monitor (we need its port to register)
    +			// 1) Web Monitor (we need its port to register)
    +			// 2) JobManager & Archive (in non-HA case, the leader service takes this)
    --- End diff --
    
    Why do you change the order of the instantiation? This is not clear to me since you left the original order in the Mesos case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103204770
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/NetUtils.java ---
    @@ -383,4 +424,8 @@ public static String getWildcardIPAddress() {
     	public interface SocketFactory {
     		ServerSocket createSocket(int port) throws IOException;
     	}
    +
    +	public interface ServerFactory<T> {
    +		T create(String address, int port) throws Exception;
    --- End diff --
    
    Java docs are missing. You should document that a `BindException` will be ignored and all other exception will cause the `createServerFromPorts` method to fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103203962
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---
    @@ -80,44 +79,23 @@ public static ActorSystem startActorSystem(
     			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
     		}
     
    -		while (portsIterator.hasNext()) {
    -			// first, we check if the port is available by opening a socket
    -			// if the actor system fails to start on the port, we try further
    -			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    -				portsIterator,
    -				new NetUtils.SocketFactory() {
    -					@Override
    -					public ServerSocket createSocket(int port) throws IOException {
    -						return new ServerSocket(port);
    -					}
    -				});
    -
    -			int port;
    -			if (availableSocket == null) {
    -				throw new BindException("Unable to allocate further port in port range: " + portRangeDefinition);
    -			} else {
    -				port = availableSocket.getLocalPort();
    +		return NetUtils.createServerFromPorts(listeningAddress, portsIterator, new NetUtils.ServerFactory<ActorSystem>() {
    +			@Override
    +			public ActorSystem create(String address, int port) throws Exception {
     				try {
    -					availableSocket.close();
    -				} catch (IOException ignored) {}
    -			}
    -
    -			try {
    -				return startActorSystem(configuration, listeningAddress, port, logger);
    -			}
    -			catch (Exception e) {
    -				// we can continue to try if this contains a netty channel exception
    -				Throwable cause = e.getCause();
    -				if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
    -						cause instanceof java.net.BindException)) {
    -					throw e;
    -				} // else fall through the loop and try the next port
    +					return startActorSystem(configuration, address, port, logger);
    +				}
    +				catch (Exception e) {
    +					Throwable cause = e.getCause();
    +					if (cause instanceof org.jboss.netty.channel.ChannelException ||
    +						cause instanceof java.net.BindException) {
    +						throw new BindException(e.getMessage());
    --- End diff --
    
    not good to throw away the stack trace of the cause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3391: [FLINK-5758] [yarn] support port range for web mon...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3391#discussion_r103203690
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---
    @@ -414,19 +410,36 @@ protected void initChannel(SocketChannel ch) {
     		NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
     		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
     
    +
    +		final String configuredAddress = cfg.getWebFrontendAddress();
    +		final Iterator<Integer> configuredPortRange = cfg.getWebFrontendPortRange();
    +
     		this.bootstrap = new ServerBootstrap();
     		this.bootstrap
     				.group(bossGroup, workerGroup)
     				.channel(NioServerSocketChannel.class)
     				.childHandler(initializer);
     
    -		ChannelFuture ch;
    -		if (configuredAddress == null) {
    -			ch = this.bootstrap.bind(configuredPort);
    -		} else {
    -			ch = this.bootstrap.bind(configuredAddress, configuredPort);
    +
    +		try {
    +			this.serverChannel = NetUtils.createServerFromPorts(configuredAddress, configuredPortRange, new NetUtils.ServerFactory<Channel>() {
    +				@Override
    +				public Channel create(String address, int port) throws Exception {
    +					ChannelFuture ch;
    +					if (address == null) {
    +						ch = bootstrap.bind(port);
    +					} else {
    +						ch = bootstrap.bind(address, port);
    +						LOG.info("Web frontend listening at configuredAddress " + address );
    +					}
    +
    +					return ch.sync().channel();
    +				}
    +			});
    +		} catch (Exception e) {
    +			throw new BindException(e.getMessage());
    --- End diff --
    
    Not sure whether all exceptions which you're catching here should be packed in a `BindException`. For example, what happens if you catch the `InterruptedException` here which can originate from `ch.sync()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---