You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yanghua <gi...@git.apache.org> on 2018/04/10 11:56:46 UTC

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

GitHub user yanghua opened a pull request:

    https://github.com/apache/flink/pull/5834

    [FLINK-9153] TaskManagerRunner should support rpc port range

    ## What is the purpose of the change
    
    *This pull request makes `TaskManagerRunner` (FLIP-6) supports rpc port range*
    
    
    ## Brief change log
    
      - *Fixed a config item reading bug and let taskmanager runner support rpc port range *
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yanghua/flink FLINK-9153

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5834.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5834
    
----
commit 212884207844d9485eb63e5e1ba32118e9fa1567
Author: yanghua <ya...@...>
Date:   2018-04-10T11:52:16Z

    [FLINK-9153] TaskManagerRunner should support rpc port range

----


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    cc @zentol 


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    cc @zentol refactored based on your suggestion, please review it again


---

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5834


---

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5834#discussion_r184037491
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -355,13 +359,53 @@ public static RpcService createRpcService(
     				taskManagerHostname, taskManagerAddress.getHostAddress());
     		}
     
    -		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
     
    -		checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
    -				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
    -				"use 0 to let the system choose port automatically.",
    -			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
    +		// parse port range definition and create port iterator
    +		Iterator<Integer> portsIterator;
    +		try {
    +			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
    +		}
    +
    +		while (portsIterator.hasNext()) {
    +			// first, we check if the port is available by opening a socket
    +			// if the actor system fails to start on the port, we try further
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    --- End diff --
    
    it seems you are right, I will remove the check code segment.


---

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5834#discussion_r184029857
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -355,13 +359,53 @@ public static RpcService createRpcService(
     				taskManagerHostname, taskManagerAddress.getHostAddress());
     		}
     
    -		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
    --- End diff --
    
    don't need the override default as `0` is already the default.


---

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5834#discussion_r184030230
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -355,13 +359,53 @@ public static RpcService createRpcService(
     				taskManagerHostname, taskManagerAddress.getHostAddress());
     		}
     
    -		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
     
    -		checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
    -				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
    -				"use 0 to let the system choose port automatically.",
    -			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
    +		// parse port range definition and create port iterator
    +		Iterator<Integer> portsIterator;
    +		try {
    +			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
    +		}
    +
    +		while (portsIterator.hasNext()) {
    +			// first, we check if the port is available by opening a socket
    +			// if the actor system fails to start on the port, we try further
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    +				portsIterator,
    +				new NetUtils.SocketFactory() {
    +					@Override
    +					public ServerSocket createSocket(int port) throws IOException {
    +						return new ServerSocket(port);
    +					}
    +				});
    +
    +			int port;
    +			if (availableSocket == null) {
    +				throw new BindException("Unable to allocate further port in port range: " + portRangeDefinition);
    +			} else {
    +				port = availableSocket.getLocalPort();
    +				try {
    +					availableSocket.close();
    +				} catch (IOException ignored) {}
    +			}
    +
    +			try {
    +				return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, port, configuration);
    +			}
    +			catch (Exception e) {
    +				// we can continue to try if this contains a netty channel exception
    +				Throwable cause = e.getCause();
    +				if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
    +					cause instanceof java.net.BindException)) {
    +					throw e;
    +				} // else fall through the loop and try the next port
    +			}
    +		}
     
    -		return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
    +		// if we come here, we have exhausted the port range
    +		throw new BindException("Could not start actor system on any port in port range "
    --- End diff --
    
    should not mention actor system but taskmanager instead


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    LGTM 👍 Will rebase and merge once my Travis is green.


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    cc @zentol 


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    hi @StephanEwen why the committers do not review those old PRs? I have serval PRs which take so long time.


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    cc @tillrohrmann for mode : `new` (flip-6) the `TaskManagerRunner` could not specify a port range, I suggest we could merge this PR as a hotfix issue and add it into 1.5 release.


---

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5834#discussion_r184035989
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -355,13 +359,53 @@ public static RpcService createRpcService(
     				taskManagerHostname, taskManagerAddress.getHostAddress());
     		}
     
    -		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
     
    -		checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
    -				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
    -				"use 0 to let the system choose port automatically.",
    -			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
    +		// parse port range definition and create port iterator
    +		Iterator<Integer> portsIterator;
    +		try {
    +			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
    +		}
    +
    +		while (portsIterator.hasNext()) {
    +			// first, we check if the port is available by opening a socket
    +			// if the actor system fails to start on the port, we try further
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    --- End diff --
    
    I know you're re-using existing code. That doesn't mean the code is perfect and shouldn't be changed. 
    
    The code explicitly says that the actor system initialization can also fail with a `BindException`:
    ```
    try {
    	return startActorSystem(configuration, listeningAddress, port, logger);
    }
    catch (Exception e) {
    	// we can continue to try if this contains a netty channel exception
    	Throwable cause = e.getCause();
    	if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
    			cause instanceof java.net.BindException)) {
    		throw e;
    	} // else fall through the loop and try the next port
    }
    ```
    
    This means that the check with the socket is redundant.


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    cc @zentol 


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    Generally I would refrain from pinging specific committers. This can be counter-productive as it discourages other committers, or even contributors, from taking a look.
    
    Similarly, pinging a committer in every PR because you saw them reviewing other PRs at the time (as has happened to me last week) isn't that helpful either. It just pushes even more work/pressure on the few committers that actually do reviews.
    
    (Note that frequent pinging inherently puts more work on me as i actually monitor all PR updates!)
    
    At last, please keep in mind that not all PRs have the same priority, especially when working towards the next release.
    Documentation changes (#5773) _can_ be merged after a release (since the docs aren't part of the release!),
    code-cleanups (#5777, #5799) and minor fixes (#5798) are usually non-critical and always pose the risk of introducing new bugs which is the last thing we want shortly before a release.


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    @zentol this PR also reviewed by you, and takes a long time, can you review this? thanks~


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    @zentol this PR also reviewed by you, and takes a long time, can you review this? thanks~


---

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5834#discussion_r184031845
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -355,13 +359,53 @@ public static RpcService createRpcService(
     				taskManagerHostname, taskManagerAddress.getHostAddress());
     		}
     
    -		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
     
    -		checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
    -				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
    -				"use 0 to let the system choose port automatically.",
    -			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
    +		// parse port range definition and create port iterator
    +		Iterator<Integer> portsIterator;
    +		try {
    +			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
    +		}
    +
    +		while (portsIterator.hasNext()) {
    +			// first, we check if the port is available by opening a socket
    +			// if the actor system fails to start on the port, we try further
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    --- End diff --
    
    @zentol the document always states it support the port range : https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#taskmanager-rpc-port


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    @zentol thanks for your clarification. Actually, when I start join Flink community, most of the PRs were reviewed by you and till. Both of you gave me a lot of suggestion and technical opinion. What's more, you are the most frequently in the issue mailing list. So I always ping you and till, I don't know this behavior brought you burden. Maybe there is a bad phenomenon: more times been saw, more times been pinged.
    
    Actually, from my (a contributor like others) view point, I don't know the committer's review plan. And the PRs take more time would take more cost (especially, like this PR in reviewing status). The contributors and committers both would look back into it's context. I ping you sometimes because I think you are reviewing other PRs, at that point, maybe this behavior would not disturb your coding. And sometimes, I may not need you to review immediately. You can give a explication or time point about reviewing or ping another committer (who work together with you) to review. Generally, a effective feedback.
    
    Now, I know your standpoint and trouble. Sorry about my behavior. You and others are good. 
    
    cc @StephanEwen 


---

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5834#discussion_r184032097
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -355,13 +359,53 @@ public static RpcService createRpcService(
     				taskManagerHostname, taskManagerAddress.getHostAddress());
     		}
     
    -		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
     
    -		checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
    -				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
    -				"use 0 to let the system choose port automatically.",
    -			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
    +		// parse port range definition and create port iterator
    +		Iterator<Integer> portsIterator;
    +		try {
    +			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
    +		}
    +
    +		while (portsIterator.hasNext()) {
    +			// first, we check if the port is available by opening a socket
    +			// if the actor system fails to start on the port, we try further
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    --- End diff --
    
    and we also has the requirement about this feature


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    Committers usually have a lot of different responsibilities (releases, testing, helping users on mailing lists, working on roadmap features, etc.). All that takes a lot of time. Reviewing PRs is one important part, and we try to do this well, but with so many users now, it is not always perfect.
    
    One big problem is that very few committers actually take the time to look at external contributions.
    
    I might help to not always ping the same people (for example @zentol , @tillrohrmann , me, etc.) but some other committers as well. Here is a list of other committers, it is not quite complete, some newer ones are not yet listed: http://flink.apache.org/community.html#people
    
    Hope that helps you understand...


---

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5834#discussion_r184032815
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -355,13 +359,53 @@ public static RpcService createRpcService(
     				taskManagerHostname, taskManagerAddress.getHostAddress());
     		}
     
    -		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
     
    -		checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
    -				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
    -				"use 0 to let the system choose port automatically.",
    -			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
    +		// parse port range definition and create port iterator
    +		Iterator<Integer> portsIterator;
    +		try {
    +			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
    +		}
    +
    +		while (portsIterator.hasNext()) {
    +			// first, we check if the port is available by opening a socket
    +			// if the actor system fails to start on the port, we try further
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    --- End diff --
    
    that's...not my point.
    
    Currently:
    1. You take a port from the range,
    2. create a socket to check its availability,
    2.a if success, goto 3
    2.b if failure, goto 1
    3. close the socket,
    4. pass port to `createRpcService`
    4.a if success, exit
    4.b if failure, goto 1
    
    So, why do step 2 and 3 at all?


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    cc @zentol @tzulitai 


---

[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5834
  
    cc @tillrohrmann : If you have time, please review this PR, thanks.


---

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5834#discussion_r184030144
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -355,13 +359,53 @@ public static RpcService createRpcService(
     				taskManagerHostname, taskManagerAddress.getHostAddress());
     		}
     
    -		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
     
    -		checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
    -				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
    -				"use 0 to let the system choose port automatically.",
    -			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
    +		// parse port range definition and create port iterator
    +		Iterator<Integer> portsIterator;
    +		try {
    +			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
    +		}
    +
    +		while (portsIterator.hasNext()) {
    +			// first, we check if the port is available by opening a socket
    +			// if the actor system fails to start on the port, we try further
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    --- End diff --
    
    I guess you took this code from `BootstrapTools#startActorSystem()`, but I'm wondering why we don't pass the port directly to `createRpcService`.


---

[GitHub] flink pull request #5834: [FLINK-9153] TaskManagerRunner should support rpc ...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5834#discussion_r184034826
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -355,13 +359,53 @@ public static RpcService createRpcService(
     				taskManagerHostname, taskManagerAddress.getHostAddress());
     		}
     
    -		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
     
    -		checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
    -				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
    -				"use 0 to let the system choose port automatically.",
    -			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
    +		// parse port range definition and create port iterator
    +		Iterator<Integer> portsIterator;
    +		try {
    +			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
    +		}
    +
    +		while (portsIterator.hasNext()) {
    +			// first, we check if the port is available by opening a socket
    +			// if the actor system fails to start on the port, we try further
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    --- End diff --
    
    it pick a available port and check it is not be applied. these logic is refered to the exists logic. And the jm, tm also support the port range.


---