You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/09/08 03:10:57 UTC
[zeppelin] branch master updated: [ZEPPELIN-5005]. Unable to get
job progress in the 8th running paragraph
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 14d747a [ZEPPELIN-5005]. Unable to get job progress in the 8th running paragraph
14d747a is described below
commit 14d747a23a33303e9edef744588ef4a3b4972b0c
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Aug 19 23:17:03 2020 +0800
[ZEPPELIN-5005]. Unable to get job progress in the 8th running paragraph
### What is this PR for?
A few sentences describing the overall goals of the pull request's commits.
First time? Check out the contributing guide - https://zeppelin.apache.org/contribution/contributions.html
### What type of PR is it?
[Bug Fix | Improvement | Feature | Documentation | Hot Fix | Refactoring]
### Todos
* [ ] - Task
### What is the Jira issue?
* Open an issue on Jira https://issues.apache.org/jira/browse/ZEPPELIN/
* Put link here, and add [ZEPPELIN-*Jira number*] in PR title, eg. [ZEPPELIN-533]
### How should this be tested?
* First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
* Strongly recommended: add automated unit tests for any new or changed behavior
* Outline any manual steps to test the PR here.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update?
* Is there breaking changes for older versions?
* Does this needs documentation?
Author: Jeff Zhang <zj...@apache.org>
Closes #3896 from zjffdu/ZEPPELIN-5005 and squashes the following commits:
8750c77b3 [Jeff Zhang] [ZEPPELIN-5005]. Unable to get job progress in the 8th running paragraph
---
.../org/apache/zeppelin/conf/ZeppelinConfiguration.java | 2 +-
.../interpreter/launcher/InterpreterLauncher.java | 8 ++++++++
.../zeppelin/interpreter/remote/PooledRemoteClient.java | 15 ++++++++++++---
.../interpreter/launcher/ClusterInterpreterLauncher.java | 4 ++++
.../interpreter/launcher/ClusterInterpreterProcess.java | 2 ++
.../interpreter/launcher/DockerInterpreterLauncher.java | 2 +-
.../interpreter/launcher/DockerInterpreterProcess.java | 5 +++--
.../launcher/DockerInterpreterProcessTest.java | 4 ++--
.../launcher/K8sRemoteInterpreterProcess.java | 3 ++-
.../launcher/K8sStandardInterpreterLauncher.java | 1 +
.../launcher/K8sRemoteInterpreterProcessTest.java | 9 +++++++++
.../interpreter/launcher/YarnInterpreterLauncher.java | 4 ++--
.../launcher/YarnRemoteInterpreterProcess.java | 5 +++--
.../apache/zeppelin/interpreter/InterpreterSetting.java | 10 ++++------
.../launcher/StandardInterpreterLauncher.java | 4 +++-
.../zeppelin/interpreter/recovery/RecoveryUtils.java | 16 +++++++++++++---
.../remote/RemoteInterpreterManagedProcess.java | 3 ++-
.../interpreter/remote/RemoteInterpreterProcess.java | 3 ++-
.../remote/RemoteInterpreterRunningProcess.java | 3 ++-
19 files changed, 76 insertions(+), 27 deletions(-)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index c4a249f..62265e0 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -924,7 +924,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_DEP_MVNREPO("zeppelin.interpreter.dep.mvnRepo",
"https://repo1.maven.org/maven2/"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 60000),
- ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
+ ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE("zeppelin.interpreter.connection.poolsize", 10),
ZEPPELIN_INTERPRETER_GROUP_DEFAULT("zeppelin.interpreter.group.default", "spark"),
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
ZEPPELIN_INTERPRETER_INCLUDES("zeppelin.interpreter.include", ""),
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
index 0d90fc4..4a569ca 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
@@ -25,6 +25,8 @@ import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE;
+
/**
* Component to Launch interpreter process.
*/
@@ -62,6 +64,12 @@ public abstract class InterpreterLauncher {
return connectTimeout;
}
+ protected int getConnectPoolSize() {
+ return Integer.parseInt(properties.getProperty(
+ ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getVarName(),
+ ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getIntValue() + ""));
+ }
+
public static String escapeSpecialCharacter(String command) {
StringBuilder builder = new StringBuilder();
for (char c : command.toCharArray()) {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
index 39dd63d..a219036 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.interpreter.remote;
import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.slf4j.Logger;
@@ -36,13 +37,21 @@ public class PooledRemoteClient<T extends TServiceClient> {
private GenericObjectPool<T> clientPool;
private RemoteClientFactory<T> remoteClientFactory;
- public PooledRemoteClient(SupplierWithIO<T> supplier) {
+ public PooledRemoteClient(SupplierWithIO<T> supplier, int connectionPoolSize) {
this.remoteClientFactory = new RemoteClientFactory<>(supplier);
- this.clientPool = new GenericObjectPool<>(remoteClientFactory);
+ GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+ poolConfig.setMaxTotal(connectionPoolSize);
+ poolConfig.setMaxIdle(connectionPoolSize);
+ this.clientPool = new GenericObjectPool<>(remoteClientFactory, poolConfig);
+ }
+
+ public PooledRemoteClient(SupplierWithIO<T> supplier) {
+ this(supplier, 10);
}
public synchronized T getClient() throws Exception {
- return clientPool.borrowObject(5_000);
+ T t = clientPool.borrowObject(5_000);
+ return t;
}
public void shutdown() {
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index f392b3d..397f6ab 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -80,6 +80,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
context.getInterpreterSettingName(),
context.getInterpreterGroupId(),
connectTimeout,
+ getConnectPoolSize(),
context.getIntpEventServerHost(),
context.getIntpEventServerPort(),
intpTserverHost,
@@ -155,6 +156,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
context.getInterpreterSettingName(),
context.getInterpreterGroupId(),
connectTimeout,
+ getConnectPoolSize(),
context.getIntpEventServerHost(),
context.getIntpEventServerPort(),
intpTserverHost,
@@ -245,6 +247,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
String intpSetGroupName = context.getInterpreterSettingGroup();
String intpSetName = context.getInterpreterSettingName();
int connectTimeout = getConnectTimeout();
+ int connectionPoolSize = getConnectPoolSize();
String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+ context.getInterpreterSettingId();
@@ -257,6 +260,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
localRepoPath,
buildEnvFromProperties(context),
connectTimeout,
+ connectionPoolSize,
intpSetName,
context.getInterpreterGroupId(),
option.isUserImpersonate());
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
index e7960ad..c9ae7f4 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
@@ -17,6 +17,7 @@ public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess {
String localRepoDir,
Map<String, String> env,
int connectTimeout,
+ int connectionPoolSize,
String interpreterSettingName,
String interpreterGroupId,
boolean isUserImpersonated) {
@@ -29,6 +30,7 @@ public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess {
localRepoDir,
env,
connectTimeout,
+ connectionPoolSize,
interpreterSettingName,
interpreterGroupId,
isUserImpersonated);
diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java
index ded6885..a81768c 100644
--- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java
@@ -73,7 +73,7 @@ public class DockerInterpreterLauncher extends InterpreterLauncher {
env,
context.getIntpEventServerHost(),
context.getIntpEventServerPort(),
- connectTimeout);
+ connectTimeout, 10);
}
boolean isSpark() {
diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
index 7e1bbb5..3221d8e 100644
--- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
@@ -117,9 +117,10 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
Map<String, String> envs,
String intpEventServerHost,
int intpEventServerPort,
- int connectTimeout
+ int connectTimeout,
+ int connectionPoolSize
) {
- super(connectTimeout, intpEventServerHost, intpEventServerPort);
+ super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort);
this.containerImage = containerImage;
this.interpreterGroupId = interpreterGroupId;
diff --git a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
index a6863c8..4a7a39d 100644
--- a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
@@ -91,7 +91,7 @@ public class DockerInterpreterProcessTest {
envs,
"zeppelin.server.hostname",
12320,
- 5000);
+ 5000, 10);
assertEquals(intp.CONTAINER_SPARK_HOME, "my-spark-home");
assertEquals(intp.uploadLocalLibToContainter, false);
@@ -117,7 +117,7 @@ public class DockerInterpreterProcessTest {
envs,
"zeppelin.server.hostname",
12320,
- 5000);
+ 5000, 10);
Properties dockerProperties = intp.getTemplateBindings();
assertEquals(dockerProperties.size(), 10);
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index b7a4b16..989bced 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -75,9 +75,10 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
boolean portForward,
String sparkImage,
int connectTimeout,
+ int connectionPoolSize,
boolean isUserImpersonatedForSpark
) {
- super(connectTimeout, intpEventServerHost, intpEventServerPort);
+ super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort);
this.client = client;
this.namespace = namespace;
this.specTemplates = specTemplates;
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index bb4b792..6eeb915 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -152,6 +152,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
zConf.getK8sPortForward(),
zConf.getK8sSparkContainerImage(),
getConnectTimeout(),
+ getConnectPoolSize(),
isUserImpersonateForSparkInterpreter(context));
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index ad4d5fd..b85ade0 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -57,6 +57,7 @@ public class K8sRemoteInterpreterProcessTest {
false,
"spark-container:1.0",
10,
+ 10,
false);
// then
@@ -85,6 +86,7 @@ public class K8sRemoteInterpreterProcessTest {
false,
"spark-container:1.0",
10,
+ 10,
false);
@@ -118,6 +120,7 @@ public class K8sRemoteInterpreterProcessTest {
false,
"spark-container:1.0",
10,
+ 10,
false);
// when
@@ -169,6 +172,7 @@ public class K8sRemoteInterpreterProcessTest {
false,
"spark-container:1.0",
10,
+ 10,
false);
// when
@@ -219,6 +223,7 @@ public class K8sRemoteInterpreterProcessTest {
false,
"spark-container:1.0",
10,
+ 10,
true);
// when
@@ -268,6 +273,7 @@ public class K8sRemoteInterpreterProcessTest {
false,
"spark-container:1.0",
10,
+ 10,
true);
// when
@@ -306,6 +312,7 @@ public class K8sRemoteInterpreterProcessTest {
false,
"spark-container:1.0",
10,
+ 10,
false);
// when non template url
@@ -349,6 +356,7 @@ public class K8sRemoteInterpreterProcessTest {
false,
"spark-container:1.0",
10,
+ 10,
false);
// when
@@ -384,6 +392,7 @@ public class K8sRemoteInterpreterProcessTest {
false,
"spark-container:1.0",
10,
+ 10,
false);
// when
diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java
index 8eb4db7..f07a1ee 100644
--- a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java
@@ -42,13 +42,13 @@ public class YarnInterpreterLauncher extends InterpreterLauncher {
public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
this.properties = context.getProperties();
- int connectTimeout = getConnectTimeout();
return new YarnRemoteInterpreterProcess(
context,
properties,
buildEnvFromProperties(context),
- connectTimeout);
+ getConnectTimeout(),
+ getConnectPoolSize());
}
protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
index e095dd9..4dca26e 100644
--- a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
@@ -99,8 +99,9 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
InterpreterLaunchContext launchContext,
Properties properties,
Map<String, String> envs,
- int connectTimeout) {
- super(connectTimeout, launchContext.getIntpEventServerHost(), launchContext.getIntpEventServerPort());
+ int connectTimeout,
+ int connectionPoolSize) {
+ super(connectTimeout, connectionPoolSize, launchContext.getIntpEventServerHost(), launchContext.getIntpEventServerPort());
this.zConf = ZeppelinConfiguration.create();
this.launchContext = launchContext;
this.properties = properties;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 6ef5e28..1a2e05e 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -56,8 +56,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -72,7 +70,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
-import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE;
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE;
import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
import static org.apache.zeppelin.util.IdHashes.generateId;
@@ -663,9 +661,9 @@ public class InterpreterSetting {
conf.getInt(ZEPPELIN_INTERPRETER_OUTPUT_LIMIT) + "");
}
- if (!jProperties.containsKey("zeppelin.interpreter.max.poolsize")) {
- jProperties.setProperty("zeppelin.interpreter.max.poolsize",
- conf.getInt(ZEPPELIN_INTERPRETER_MAX_POOL_SIZE) + "");
+ if (!jProperties.containsKey(ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getVarName())) {
+ jProperties.setProperty(ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getVarName(),
+ conf.getInt(ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE) + "");
}
String interpreterLocalRepoPath = conf.getInterpreterLocalRepoPath();
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index bc5034a..aff3840 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -53,12 +53,14 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
String groupName = context.getInterpreterSettingGroup();
String name = context.getInterpreterSettingName();
int connectTimeout = getConnectTimeout();
+ int connectionPoolSize = getConnectPoolSize();
if (option.isExistingProcess()) {
return new RemoteInterpreterRunningProcess(
context.getInterpreterSettingName(),
context.getInterpreterGroupId(),
connectTimeout,
+ connectionPoolSize,
context.getIntpEventServerHost(),
context.getIntpEventServerPort(),
option.getHost(),
@@ -72,7 +74,7 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
context.getIntpEventServerPort(), context.getIntpEventServerHost(), zConf.getInterpreterPortRange(),
zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
- buildEnvFromProperties(context), connectTimeout, name,
+ buildEnvFromProperties(context), connectTimeout, connectionPoolSize, name,
context.getInterpreterGroupId(), option.isUserImpersonate());
}
}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java
index 83db705..544f49b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java
@@ -33,6 +33,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE;
public class RecoveryUtils {
@@ -73,16 +76,23 @@ public class RecoveryUtils {
InterpreterSettingManager interpreterSettingManager,
ZeppelinConfiguration zConf) {
+ int connectTimeout =
+ zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+ Properties interpreterProperties = interpreterSettingManager.getByName(interpreterSettingName).getJavaProperties();
+ int connectionPoolSize = Integer.parseInt(interpreterProperties.getProperty(
+ ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getVarName(),
+ ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getIntValue() + ""));
+
Map<String, InterpreterClient> clients = new HashMap<>();
+
if (!StringUtils.isBlank(recoveryData)) {
for (String line : recoveryData.split(System.lineSeparator())) {
String[] tokens = line.split("\t");
String interpreterGroupId = tokens[0];
String[] hostPort = tokens[1].split(":");
- int connectTimeout =
- zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+
RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess(
- interpreterSettingName, interpreterGroupId, connectTimeout,
+ interpreterSettingName, interpreterGroupId, connectTimeout, connectionPoolSize,
interpreterSettingManager.getInterpreterEventServer().getHost(),
interpreterSettingManager.getInterpreterEventServer().getPort(),
hostPort[0], Integer.parseInt(hostPort[1]), true);
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index a4a5df3..e9af2e1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -63,10 +63,11 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
String localRepoDir,
Map<String, String> env,
int connectTimeout,
+ int connectionPoolSize,
String interpreterSettingName,
String interpreterGroupId,
boolean isUserImpersonated) {
- super(connectTimeout, intpEventServerHost, intpEventServerPort);
+ super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort);
this.interpreterRunner = intpRunner;
this.interpreterPortRange = interpreterPortRange;
this.env = env;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index e3a81ac..616bfc3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -41,6 +41,7 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
private PooledRemoteClient<Client> remoteClient;
public RemoteInterpreterProcess(int connectTimeout,
+ int connectionPoolSize,
String intpEventServerHost,
int intpEventServerPort) {
this.connectTimeout = connectTimeout;
@@ -55,7 +56,7 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
}
TProtocol protocol = new TBinaryProtocol(transport);
return new Client(protocol);
- });
+ }, connectionPoolSize);
}
public int getConnectTimeout() {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
index 053a72c..c956323 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -36,12 +36,13 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
String interpreterSettingName,
String interpreterGroupId,
int connectTimeout,
+ int connectionPoolSize,
String intpEventServerHost,
int intpEventServerPort,
String host,
int port,
boolean isRecovery) {
- super(connectTimeout, intpEventServerHost, intpEventServerPort);
+ super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort);
this.interpreterSettingName = interpreterSettingName;
this.interpreterGroupId = interpreterGroupId;
this.host = host;