You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/28 13:07:49 UTC
flink git commit: [FLINK-9153] TaskManagerRunner should support rpc
port range.
Repository: flink
Updated Branches:
refs/heads/master a47f51ffe -> 7c9044784
[FLINK-9153] TaskManagerRunner should support rpc port range.
This closes #5834.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c904478
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c904478
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c904478
Branch: refs/heads/master
Commit: 7c904478499d21fa0f12207fb84daffd749bec44
Parents: a47f51f
Author: yanghua <ya...@gmail.com>
Authored: Tue Apr 10 19:52:16 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon May 28 15:05:39 2018 +0200
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskManagerRunner.java | 36 ++++++++++++++++----
1 file changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7c904478/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 2a775bf..23fd29e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
@@ -51,22 +52,24 @@ import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.NetUtils;
import akka.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.BindException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* This class is the executable entry point for the task manager in yarn or standalone mode.
@@ -389,13 +392,32 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
taskManagerHostname, taskManagerAddress.getHostAddress());
}
- final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
- checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
- "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
- "use 0 to let the system choose port automatically.",
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+ // parse port range definition and create port iterator
+ Iterator<Integer> portsIterator;
+ try {
+ portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
+ }
+
+ while (portsIterator.hasNext()) {
+ try {
+ return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portsIterator.next(), configuration);
+ }
+ catch (Exception e) {
+ // we can continue to try if this contains a netty channel exception
+ Throwable cause = e.getCause();
+ if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
+ cause instanceof java.net.BindException)) {
+ throw e;
+ } // else fall through the loop and try the next port
+ }
+ }
- return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
+ // if we come here, we have exhausted the port range
+ throw new BindException("Could not start task manager on any port in port range "
+ + portRangeDefinition);
}
}