You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/28 13:07:49 UTC

flink git commit: [FLINK-9153] TaskManagerRunner should support rpc port range.

Repository: flink
Updated Branches:
  refs/heads/master a47f51ffe -> 7c9044784


[FLINK-9153] TaskManagerRunner should support rpc port range.

This closes #5834.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c904478
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c904478
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c904478

Branch: refs/heads/master
Commit: 7c904478499d21fa0f12207fb84daffd749bec44
Parents: a47f51f
Author: yanghua <ya...@gmail.com>
Authored: Tue Apr 10 19:52:16 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon May 28 15:05:39 2018 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskManagerRunner.java | 36 ++++++++++++++++----
 1 file changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c904478/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 2a775bf..23fd29e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
@@ -51,22 +52,24 @@ import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.NetUtils;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.BindException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This class is the executable entry point for the task manager in yarn or standalone mode.
@@ -389,13 +392,32 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 				taskManagerHostname, taskManagerAddress.getHostAddress());
 		}
 
-		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
 
-		checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
-				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
-				"use 0 to let the system choose port automatically.",
-			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+		// parse port range definition and create port iterator
+		Iterator<Integer> portsIterator;
+		try {
+			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
+		} catch (Exception e) {
+			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
+		}
+
+		while (portsIterator.hasNext()) {
+			try {
+				return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portsIterator.next(), configuration);
+			}
+			catch (Exception e) {
+				// we can continue to try if this contains a netty channel exception
+				Throwable cause = e.getCause();
+				if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
+					cause instanceof java.net.BindException)) {
+					throw e;
+				} // else fall through the loop and try the next port
+			}
+		}
 
-		return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
+		// if we come here, we have exhausted the port range
+		throw new BindException("Could not start task manager on any port in port range "
+			+ portRangeDefinition);
 	}
 }