You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/11/13 07:27:28 UTC
zeppelin git commit: ZEPPELIN-3040. Allow to specify portRange for
interpreter process thrift service
Repository: zeppelin
Updated Branches:
refs/heads/master 3b1a03f38 -> 382479fd5
ZEPPELIN-3040. Allow to specify portRange for interpreter process thrift service
### What is this PR for?
This PR is trying to add new configuration `zeppelin.interpreter.portRange` which control the portRange of interpreter process. This is required by some users for security reason.
### What type of PR is it?
[Improvement | Feature ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3040
### How should this be tested?
Manually test. Set zeppelin.interpreter.portRange and launch python interpreter, verify it is in the proper portRange.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #2661 from zjffdu/ZEPPELIN-3040 and squashes the following commits:
a87c425 [Jeff Zhang] address comments
7e885bd [Jeff Zhang] ZEPPELIN-3040. Allow to specify portRange for interpreter process thrift service
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/382479fd
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/382479fd
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/382479fd
Branch: refs/heads/master
Commit: 382479fd502b5872fe0f7914d9901c4473069cc2
Parents: 3b1a03f
Author: Jeff Zhang <zj...@apache.org>
Authored: Sun Nov 12 09:18:41 2017 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Nov 13 15:27:17 2017 +0800
----------------------------------------------------------------------
bin/interpreter.sh | 15 +++++----
.../zeppelin/helium/ZeppelinDevServer.java | 2 +-
.../zeppelin/conf/ZeppelinConfiguration.java | 5 +++
.../remote/RemoteInterpreterServer.java | 28 ++++++++++-------
.../remote/RemoteInterpreterUtils.java | 32 +++++++++++++-------
.../remote/RemoteInterpreterServerTest.java | 4 +--
.../remote/RemoteInterpreterUtilsTest.java | 10 +++---
.../launcher/ShellScriptLauncher.java | 2 +-
.../remote/RemoteInterpreterManagedProcess.java | 27 +++++++++--------
9 files changed, 76 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 4e983ec..458ffc0 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -20,10 +20,10 @@ bin=$(dirname "${BASH_SOURCE-$0}")
bin=$(cd "${bin}">/dev/null; pwd)
function usage() {
- echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
+ echo "usage) $0 -p <port> -r <intp_port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
}
-while getopts "hc:p:d:l:v:u:g:" o; do
+while getopts "hc:p:r:d:l:v:u:g:" o; do
case ${o} in
h)
usage
@@ -36,7 +36,10 @@ while getopts "hc:p:d:l:v:u:g:" o; do
CALLBACK_HOST=${OPTARG} # This will be used callback host
;;
p)
- PORT=${OPTARG} # This will be used callback port
+ PORT=${OPTARG} # This will be used for callback port
+ ;;
+ r)
+ INTP_PORT=${OPTARG} # This will be used for interpreter process port
;;
l)
LOCAL_INTERPRETER_REPO=${OPTARG}
@@ -204,12 +207,12 @@ fi
if [[ -n "${SPARK_SUBMIT}" ]]; then
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then
- INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
+ INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
else
- INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
+ INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
fi
else
- INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} `
+ INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
fi
if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
----------------------------------------------------------------------
diff --git a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
index 3a5199d..607839e 100644
--- a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
+++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
@@ -39,7 +39,7 @@ public class ZeppelinDevServer extends
private DevInterpreter interpreter = null;
private InterpreterOutput out;
public ZeppelinDevServer(int port) throws TException, IOException {
- super(null, port);
+ super(null, port, ":");
}
@Override
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index f280475..1bc242d 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -488,6 +488,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
}
+ public String getInterpreterPortRange() {
+ return getString(ConfVars.ZEPPELIN_INTERPRETER_PORTRANGE);
+ }
+
public boolean isWindowsPath(String path){
return path.matches("^[A-Za-z]:\\\\.*");
}
@@ -705,6 +709,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""),
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"),
+ ZEPPELIN_INTERPRETER_PORTRANGE("zeppelin.interpreter.portRange", ":"),
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class",
"org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager"),
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index cb0488c..86f35c6 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -132,19 +132,19 @@ public class RemoteInterpreterServer
private boolean isTest;
- public RemoteInterpreterServer(String callbackHost, int port) throws IOException,
- TTransportException {
- this(callbackHost, port, false);
+ public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange)
+ throws IOException, TTransportException {
+ this(callbackHost, callbackPort, portRange, false);
}
- public RemoteInterpreterServer(String callbackHost, int port, boolean isTest)
- throws TTransportException, IOException {
+ public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange,
+ boolean isTest) throws TTransportException, IOException {
if (null != callbackHost) {
this.callbackHost = callbackHost;
- this.callbackPort = port;
+ this.callbackPort = callbackPort;
} else {
// DevInterpreter
- this.port = port;
+ this.port = callbackPort;
}
this.isTest = isTest;
@@ -152,14 +152,16 @@ public class RemoteInterpreterServer
TServerSocket serverTransport;
if (null == callbackHost) {
// Dev Interpreter
- serverTransport = new TServerSocket(port);
+ serverTransport = new TServerSocket(callbackPort);
} else {
- this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange);
+ this.port = serverTransport.getServerSocket().getLocalPort();
this.host = RemoteInterpreterUtils.findAvailableHostAddress();
- serverTransport = new TServerSocket(this.port);
+ logger.info("Launching ThriftServer at " + this.host + ":" + this.port);
}
server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport).processor(processor));
+ logger.info("Starting remote interpreter server on port {}", port);
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
remoteWorksController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool);
}
@@ -254,12 +256,16 @@ public class RemoteInterpreterServer
throws TTransportException, InterruptedException, IOException {
String callbackHost = null;
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
+ String portRange = ":";
if (args.length > 0) {
callbackHost = args[0];
port = Integer.parseInt(args[1]);
+ if (args.length > 2) {
+ portRange = args[2];
+ }
}
RemoteInterpreterServer remoteInterpreterServer =
- new RemoteInterpreterServer(callbackHost, port);
+ new RemoteInterpreterServer(callbackHost, port, portRange);
remoteInterpreterServer.start();
remoteInterpreterServer.join();
System.exit(0);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 835199a..223588f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -29,11 +29,15 @@ import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Collections;
+
+import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +52,12 @@ public class RemoteInterpreterUtils {
public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
- return findRandomAvailablePortOnAllLocalInterfaces(":");
+ int port;
+ try (ServerSocket socket = new ServerSocket(0);) {
+ port = socket.getLocalPort();
+ socket.close();
+ }
+ return port;
}
/**
@@ -58,21 +67,22 @@ public class RemoteInterpreterUtils {
* @return
* @throws IOException
*/
- public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange)
+ public static TServerSocket createTServerSocket(String portRange)
throws IOException {
+ TServerSocket tSocket = null;
// ':' is the default value which means no constraints on the portRange
- if (portRange == null || portRange.equals(":")) {
- int port;
- try (ServerSocket socket = new ServerSocket(0);) {
- port = socket.getLocalPort();
- socket.close();
+ if (StringUtils.isBlank(portRange) || portRange.equals(":")) {
+ try {
+ tSocket = new TServerSocket(0);
+ return tSocket;
+ } catch (TTransportException e) {
+ throw new IOException("Fail to create TServerSocket", e);
}
- return port;
}
// valid user registered port https://en.wikipedia.org/wiki/Registered_port
int start = 1024;
- int end = 49151;
+ int end = 65535;
String[] ports = portRange.split(":", -1);
if (!ports[0].isEmpty()) {
start = Integer.parseInt(ports[0]);
@@ -82,8 +92,8 @@ public class RemoteInterpreterUtils {
}
for (int i = start; i <= end; ++i) {
try {
- ServerSocket socket = new ServerSocket(i);
- return socket.getLocalPort();
+ tSocket = new TServerSocket(i);
+ return tSocket;
} catch (Exception e) {
// ignore this
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index b2fcae1..1cb2cb6 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -43,7 +43,7 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStop() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
- RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
+ RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
assertEquals(false, server.isRunning());
server.start();
@@ -91,7 +91,7 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
- RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
+ RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
assertEquals(false, server.isRunning());
server.start();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
index afbbcbd..8eeb85a 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
@@ -26,17 +26,17 @@ import static org.junit.Assert.assertTrue;
public class RemoteInterpreterUtilsTest {
@Test
- public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
- assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
+ public void testCreateTServerSocket() throws IOException {
+ assertTrue(RemoteInterpreterUtils.createTServerSocket(":").getServerSocket().getLocalPort() > 0);
String portRange = ":30000";
- assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) <= 30000);
+ assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort() <= 30000);
portRange = "30000:";
- assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) >= 30000);
+ assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort() >= 30000);
portRange = "30000:40000";
- int port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
+ int port = RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort();
assertTrue(port >= 30000 && port <= 40000);
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
index 0966ec5..8c86129 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
@@ -64,7 +64,7 @@ public class ShellScriptLauncher extends InterpreterLauncher {
+ context.getInterpreterSettingId();
return new RemoteInterpreterManagedProcess(
runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
- zConf.getCallbackPortRange(),
+ zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
buildEnvFromProperties(), connectTimeout, name);
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 9f8f346..27e826c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -52,11 +52,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
RemoteInterpreterManagedProcess.class);
private final String interpreterRunner;
- private final String portRange;
+ private final String callbackPortRange;
+ private final String interpreterPortRange;
private DefaultExecutor executor;
private ExecuteWatchdog watchdog;
private AtomicBoolean running = new AtomicBoolean(false);
- TServer callbackServer;
+ private TServer callbackServer;
private String host = null;
private int port = -1;
private final String interpreterDir;
@@ -67,7 +68,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
public RemoteInterpreterManagedProcess(
String intpRunner,
- String portRange,
+ String callbackPortRange,
+ String interpreterPortRange,
String intpDir,
String localRepoDir,
Map<String, String> env,
@@ -75,7 +77,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
String interpreterSettingName) {
super(connectTimeout);
this.interpreterRunner = intpRunner;
- this.portRange = portRange;
+ this.callbackPortRange = callbackPortRange;
+ this.interpreterPortRange = interpreterPortRange;
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
@@ -84,7 +87,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
@Override
public String getHost() {
- return "localhost";
+ return host;
}
@Override
@@ -97,11 +100,11 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
// start server process
final String callbackHost;
final int callbackPort;
+ TServerSocket tSocket = null;
try {
- port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
- logger.info("Choose port {} for RemoteInterpreterProcess", port);
+ tSocket = RemoteInterpreterUtils.createTServerSocket(callbackPortRange);
+ callbackPort = tSocket.getServerSocket().getLocalPort();
callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
- callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e1) {
throw new RuntimeException(e1);
}
@@ -109,12 +112,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
logger.info("Thrift server for callback will start. Port: {}", callbackPort);
try {
callbackServer = new TThreadPoolServer(
- new TThreadPoolServer.Args(new TServerSocket(callbackPort)).processor(
+ new TThreadPoolServer.Args(tSocket).processor(
new RemoteInterpreterCallbackService.Processor<>(
new RemoteInterpreterCallbackService.Iface() {
@Override
public void callback(CallbackInfo callbackInfo) throws TException {
- logger.info("Registered: {}", callbackInfo);
+ logger.info("RemoteInterpreterServer Registered: {}", callbackInfo);
host = callbackInfo.getHost();
port = callbackInfo.getPort();
running.set(true);
@@ -145,8 +148,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
Thread.sleep(500);
}
logger.debug("callbackServer is serving now");
- } catch (TTransportException e) {
- logger.error("callback server error.", e);
} catch (InterruptedException e) {
logger.warn("", e);
}
@@ -158,6 +159,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
cmdLine.addArgument(callbackHost, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(callbackPort), false);
+ cmdLine.addArgument("-r", false);
+ cmdLine.addArgument(interpreterPortRange, false);
if (isUserImpersonate && !userName.equals("anonymous")) {
cmdLine.addArgument("-u", false);
cmdLine.addArgument(userName, false);