You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/09/08 03:11:19 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5005]. Unable to get job progress in the 8th running paragraph

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 4c7d3e0  [ZEPPELIN-5005]. Unable to get job progress in the 8th running paragraph
4c7d3e0 is described below

commit 4c7d3e0e18ca3bfbfe043107a63c56263116e2fb
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Aug 19 23:17:03 2020 +0800

    [ZEPPELIN-5005]. Unable to get job progress in the 8th running paragraph
    
    ### What is this PR for?
    A few sentences describing the overall goals of the pull request's commits.
    First time? Check out the contributing guide - https://zeppelin.apache.org/contribution/contributions.html
    
    ### What type of PR is it?
    [Bug Fix | Improvement | Feature | Documentation | Hot Fix | Refactoring]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * Open an issue on Jira https://issues.apache.org/jira/browse/ZEPPELIN/
    * Put link here, and add [ZEPPELIN-*Jira number*] in PR title, eg. [ZEPPELIN-533]
    
    ### How should this be tested?
    * First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
    * Strongly recommended: add automated unit tests for any new or changed behavior
    * Outline any manual steps to test the PR here.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update?
    * Is there breaking changes for older versions?
    * Does this needs documentation?
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3896 from zjffdu/ZEPPELIN-5005 and squashes the following commits:
    
    8750c77b3 [Jeff Zhang] [ZEPPELIN-5005]. Unable to get job progress in the 8th running paragraph
    
    (cherry picked from commit 14d747a23a33303e9edef744588ef4a3b4972b0c)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../org/apache/zeppelin/conf/ZeppelinConfiguration.java  |  2 +-
 .../interpreter/launcher/InterpreterLauncher.java        |  8 ++++++++
 .../zeppelin/interpreter/remote/PooledRemoteClient.java  | 15 ++++++++++++---
 .../interpreter/launcher/ClusterInterpreterLauncher.java |  4 ++++
 .../interpreter/launcher/ClusterInterpreterProcess.java  |  2 ++
 .../interpreter/launcher/DockerInterpreterLauncher.java  |  2 +-
 .../interpreter/launcher/DockerInterpreterProcess.java   |  5 +++--
 .../launcher/DockerInterpreterProcessTest.java           |  4 ++--
 .../launcher/K8sRemoteInterpreterProcess.java            |  3 ++-
 .../launcher/K8sStandardInterpreterLauncher.java         |  1 +
 .../launcher/K8sRemoteInterpreterProcessTest.java        |  9 +++++++++
 .../interpreter/launcher/YarnInterpreterLauncher.java    |  4 ++--
 .../launcher/YarnRemoteInterpreterProcess.java           |  5 +++--
 .../apache/zeppelin/interpreter/InterpreterSetting.java  | 10 ++++------
 .../launcher/StandardInterpreterLauncher.java            |  4 +++-
 .../zeppelin/interpreter/recovery/RecoveryUtils.java     | 16 +++++++++++++---
 .../remote/RemoteInterpreterManagedProcess.java          |  3 ++-
 .../interpreter/remote/RemoteInterpreterProcess.java     |  3 ++-
 .../remote/RemoteInterpreterRunningProcess.java          |  3 ++-
 19 files changed, 76 insertions(+), 27 deletions(-)

diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 1db8941..27f49d0 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -924,7 +924,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     ZEPPELIN_INTERPRETER_DEP_MVNREPO("zeppelin.interpreter.dep.mvnRepo",
         "https://repo1.maven.org/maven2/"),
     ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 60000),
-    ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
+    ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE("zeppelin.interpreter.connection.poolsize", 10),
     ZEPPELIN_INTERPRETER_GROUP_DEFAULT("zeppelin.interpreter.group.default", "spark"),
     ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
     ZEPPELIN_INTERPRETER_INCLUDES("zeppelin.interpreter.include", ""),
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
index 0d90fc4..4a569ca 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
@@ -25,6 +25,8 @@ import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE;
+
 /**
  * Component to Launch interpreter process.
  */
@@ -62,6 +64,12 @@ public abstract class InterpreterLauncher {
     return connectTimeout;
   }
 
+  protected int getConnectPoolSize() {
+    return Integer.parseInt(properties.getProperty(
+            ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getVarName(),
+            ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getIntValue() + ""));
+  }
+
   public static String escapeSpecialCharacter(String command) {
     StringBuilder builder = new StringBuilder();
     for (char c : command.toCharArray()) {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
index 39dd63d..a219036 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.interpreter.remote;
 
 import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.thrift.TException;
 import org.apache.thrift.TServiceClient;
 import org.slf4j.Logger;
@@ -36,13 +37,21 @@ public class PooledRemoteClient<T extends TServiceClient> {
   private GenericObjectPool<T> clientPool;
   private RemoteClientFactory<T> remoteClientFactory;
 
-  public PooledRemoteClient(SupplierWithIO<T> supplier) {
+  public PooledRemoteClient(SupplierWithIO<T> supplier, int connectionPoolSize) {
     this.remoteClientFactory = new RemoteClientFactory<>(supplier);
-    this.clientPool = new GenericObjectPool<>(remoteClientFactory);
+    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+    poolConfig.setMaxTotal(connectionPoolSize);
+    poolConfig.setMaxIdle(connectionPoolSize);
+    this.clientPool = new GenericObjectPool<>(remoteClientFactory, poolConfig);
+  }
+
+  public PooledRemoteClient(SupplierWithIO<T> supplier) {
+    this(supplier, 10);
   }
 
   public synchronized T getClient() throws Exception {
-    return clientPool.borrowObject(5_000);
+    T t = clientPool.borrowObject(5_000);
+    return t;
   }
 
   public void shutdown() {
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index f392b3d..397f6ab 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -80,6 +80,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
                 context.getInterpreterSettingName(),
                 context.getInterpreterGroupId(),
                 connectTimeout,
+                getConnectPoolSize(),
                 context.getIntpEventServerHost(),
                 context.getIntpEventServerPort(),
                 intpTserverHost,
@@ -155,6 +156,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
                 context.getInterpreterSettingName(),
                 context.getInterpreterGroupId(),
                 connectTimeout,
+                getConnectPoolSize(),
                 context.getIntpEventServerHost(),
                 context.getIntpEventServerPort(),
                 intpTserverHost,
@@ -245,6 +247,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
       String intpSetGroupName = context.getInterpreterSettingGroup();
       String intpSetName = context.getInterpreterSettingName();
       int connectTimeout = getConnectTimeout();
+      int connectionPoolSize = getConnectPoolSize();
       String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
           + context.getInterpreterSettingId();
 
@@ -257,6 +260,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
           localRepoPath,
           buildEnvFromProperties(context),
           connectTimeout,
+          connectionPoolSize,
           intpSetName,
           context.getInterpreterGroupId(),
           option.isUserImpersonate());
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
index e7960ad..c9ae7f4 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
@@ -17,6 +17,7 @@ public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess {
       String localRepoDir,
       Map<String, String> env,
       int connectTimeout,
+      int connectionPoolSize,
       String interpreterSettingName,
       String interpreterGroupId,
       boolean isUserImpersonated) {
@@ -29,6 +30,7 @@ public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess {
       localRepoDir,
       env,
       connectTimeout,
+      connectionPoolSize,
       interpreterSettingName,
       interpreterGroupId,
       isUserImpersonated);
diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java
index ded6885..a81768c 100644
--- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java
@@ -73,7 +73,7 @@ public class DockerInterpreterLauncher extends InterpreterLauncher {
         env,
         context.getIntpEventServerHost(),
         context.getIntpEventServerPort(),
-        connectTimeout);
+        connectTimeout, 10);
   }
 
   boolean isSpark() {
diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
index 7e1bbb5..3221d8e 100644
--- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
@@ -117,9 +117,10 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
       Map<String, String> envs,
       String intpEventServerHost,
       int intpEventServerPort,
-      int connectTimeout
+      int connectTimeout,
+      int connectionPoolSize
   ) {
-    super(connectTimeout, intpEventServerHost, intpEventServerPort);
+    super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort);
 
     this.containerImage = containerImage;
     this.interpreterGroupId = interpreterGroupId;
diff --git a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
index a6863c8..4a7a39d 100644
--- a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
@@ -91,7 +91,7 @@ public class DockerInterpreterProcessTest {
         envs,
         "zeppelin.server.hostname",
         12320,
-        5000);
+        5000, 10);
 
     assertEquals(intp.CONTAINER_SPARK_HOME, "my-spark-home");
     assertEquals(intp.uploadLocalLibToContainter, false);
@@ -117,7 +117,7 @@ public class DockerInterpreterProcessTest {
         envs,
         "zeppelin.server.hostname",
         12320,
-        5000);
+        5000, 10);
 
     Properties dockerProperties = intp.getTemplateBindings();
     assertEquals(dockerProperties.size(), 10);
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index b7a4b16..989bced 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -75,9 +75,10 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
           boolean portForward,
           String sparkImage,
           int connectTimeout,
+          int connectionPoolSize,
           boolean isUserImpersonatedForSpark
   ) {
-    super(connectTimeout, intpEventServerHost, intpEventServerPort);
+    super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort);
     this.client = client;
     this.namespace = namespace;
     this.specTemplates = specTemplates;
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index 799a91c..55e27dc 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -152,6 +152,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
             zConf.getK8sPortForward(),
             zConf.getK8sSparkContainerImage(),
             getConnectTimeout(),
+            getConnectPoolSize(),
             isUserImpersonateForSparkInterpreter(context));
   }
 
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index ad4d5fd..b85ade0 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -57,6 +57,7 @@ public class K8sRemoteInterpreterProcessTest {
         false,
         "spark-container:1.0",
         10,
+        10,
         false);
 
     // then
@@ -85,6 +86,7 @@ public class K8sRemoteInterpreterProcessTest {
         false,
         "spark-container:1.0",
         10,
+        10,
         false);
 
 
@@ -118,6 +120,7 @@ public class K8sRemoteInterpreterProcessTest {
         false,
         "spark-container:1.0",
         10,
+        10,
         false);
 
     // when
@@ -169,6 +172,7 @@ public class K8sRemoteInterpreterProcessTest {
         false,
         "spark-container:1.0",
         10,
+        10,
         false);
 
     // when
@@ -219,6 +223,7 @@ public class K8sRemoteInterpreterProcessTest {
         false,
         "spark-container:1.0",
         10,
+        10,
         true);
 
     // when
@@ -268,6 +273,7 @@ public class K8sRemoteInterpreterProcessTest {
         false,
         "spark-container:1.0",
         10,
+        10,
         true);
 
     // when
@@ -306,6 +312,7 @@ public class K8sRemoteInterpreterProcessTest {
         false,
         "spark-container:1.0",
         10,
+        10,
         false);
 
     // when non template url
@@ -349,6 +356,7 @@ public class K8sRemoteInterpreterProcessTest {
         false,
         "spark-container:1.0",
         10,
+        10,
         false);
 
     // when
@@ -384,6 +392,7 @@ public class K8sRemoteInterpreterProcessTest {
         false,
         "spark-container:1.0",
         10,
+        10,
         false);
 
     // when
diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java
index 8eb4db7..f07a1ee 100644
--- a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java
@@ -42,13 +42,13 @@ public class YarnInterpreterLauncher extends InterpreterLauncher {
   public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
     LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
     this.properties = context.getProperties();
-    int connectTimeout = getConnectTimeout();
 
     return new YarnRemoteInterpreterProcess(
             context,
             properties,
             buildEnvFromProperties(context),
-            connectTimeout);
+            getConnectTimeout(),
+            getConnectPoolSize());
   }
 
   protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
index e095dd9..4dca26e 100644
--- a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
@@ -99,8 +99,9 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
           InterpreterLaunchContext launchContext,
           Properties properties,
           Map<String, String> envs,
-          int connectTimeout) {
-    super(connectTimeout, launchContext.getIntpEventServerHost(), launchContext.getIntpEventServerPort());
+          int connectTimeout,
+          int connectionPoolSize) {
+    super(connectTimeout, connectionPoolSize, launchContext.getIntpEventServerHost(), launchContext.getIntpEventServerPort());
     this.zConf = ZeppelinConfiguration.create();
     this.launchContext = launchContext;
     this.properties = properties;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 6ef5e28..1a2e05e 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -56,8 +56,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.StringReader;
 import java.io.StringWriter;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -72,7 +70,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
-import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE;
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE;
 import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
 import static org.apache.zeppelin.util.IdHashes.generateId;
 
@@ -663,9 +661,9 @@ public class InterpreterSetting {
           conf.getInt(ZEPPELIN_INTERPRETER_OUTPUT_LIMIT) + "");
     }
 
-    if (!jProperties.containsKey("zeppelin.interpreter.max.poolsize")) {
-      jProperties.setProperty("zeppelin.interpreter.max.poolsize",
-          conf.getInt(ZEPPELIN_INTERPRETER_MAX_POOL_SIZE) + "");
+    if (!jProperties.containsKey(ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getVarName())) {
+      jProperties.setProperty(ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getVarName(),
+          conf.getInt(ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE) + "");
     }
 
     String interpreterLocalRepoPath = conf.getInterpreterLocalRepoPath();
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index bc5034a..aff3840 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -53,12 +53,14 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
     String groupName = context.getInterpreterSettingGroup();
     String name = context.getInterpreterSettingName();
     int connectTimeout = getConnectTimeout();
+    int connectionPoolSize = getConnectPoolSize();
 
     if (option.isExistingProcess()) {
       return new RemoteInterpreterRunningProcess(
           context.getInterpreterSettingName(),
           context.getInterpreterGroupId(),
           connectTimeout,
+          connectionPoolSize,
           context.getIntpEventServerHost(),
           context.getIntpEventServerPort(),
           option.getHost(),
@@ -72,7 +74,7 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
           runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
           context.getIntpEventServerPort(), context.getIntpEventServerHost(), zConf.getInterpreterPortRange(),
           zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
-          buildEnvFromProperties(context), connectTimeout, name,
+          buildEnvFromProperties(context), connectTimeout, connectionPoolSize, name,
           context.getInterpreterGroupId(), option.isUserImpersonate());
     }
   }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java
index 83db705..544f49b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java
@@ -33,6 +33,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE;
 
 public class RecoveryUtils {
 
@@ -73,16 +76,23 @@ public class RecoveryUtils {
                                                                        InterpreterSettingManager interpreterSettingManager,
                                                                        ZeppelinConfiguration zConf) {
 
+    int connectTimeout =
+            zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+    Properties interpreterProperties =  interpreterSettingManager.getByName(interpreterSettingName).getJavaProperties();
+    int connectionPoolSize = Integer.parseInt(interpreterProperties.getProperty(
+            ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getVarName(),
+            ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getIntValue() + ""));
+
     Map<String, InterpreterClient> clients = new HashMap<>();
+
     if (!StringUtils.isBlank(recoveryData)) {
       for (String line : recoveryData.split(System.lineSeparator())) {
         String[] tokens = line.split("\t");
         String interpreterGroupId = tokens[0];
         String[] hostPort = tokens[1].split(":");
-        int connectTimeout =
-                zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+
         RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess(
-                interpreterSettingName, interpreterGroupId, connectTimeout,
+                interpreterSettingName, interpreterGroupId, connectTimeout, connectionPoolSize,
                 interpreterSettingManager.getInterpreterEventServer().getHost(),
                 interpreterSettingManager.getInterpreterEventServer().getPort(),
                 hostPort[0], Integer.parseInt(hostPort[1]), true);
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index a4a5df3..e9af2e1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -63,10 +63,11 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
       String localRepoDir,
       Map<String, String> env,
       int connectTimeout,
+      int connectionPoolSize,
       String interpreterSettingName,
       String interpreterGroupId,
       boolean isUserImpersonated) {
-    super(connectTimeout, intpEventServerHost, intpEventServerPort);
+    super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort);
     this.interpreterRunner = intpRunner;
     this.interpreterPortRange = interpreterPortRange;
     this.env = env;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index e3a81ac..616bfc3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -41,6 +41,7 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
   private PooledRemoteClient<Client> remoteClient;
 
   public RemoteInterpreterProcess(int connectTimeout,
+                                  int connectionPoolSize,
                                   String intpEventServerHost,
                                   int intpEventServerPort) {
     this.connectTimeout = connectTimeout;
@@ -55,7 +56,7 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
       }
       TProtocol protocol = new  TBinaryProtocol(transport);
       return new Client(protocol);
-    });
+    }, connectionPoolSize);
   }
 
   public int getConnectTimeout() {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
index 053a72c..c956323 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -36,12 +36,13 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
       String interpreterSettingName,
       String interpreterGroupId,
       int connectTimeout,
+      int connectionPoolSize,
       String intpEventServerHost,
       int intpEventServerPort,
       String host,
       int port,
       boolean isRecovery) {
-    super(connectTimeout, intpEventServerHost, intpEventServerPort);
+    super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort);
     this.interpreterSettingName = interpreterSettingName;
     this.interpreterGroupId = interpreterGroupId;
     this.host = host;