You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/12/17 15:19:14 UTC

[zeppelin] branch master updated: [ZEPPELIN-5151]. connection pool size is not set in RemoteInterpreterEventClient

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 04d86db  [ZEPPELIN-5151]. connection pool size is not set in RemoteInterpreterEventClient
04d86db is described below

commit 04d86db047d46da37a6310969cf008c34ceb0f23
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Dec 7 10:28:57 2020 +0800

    [ZEPPELIN-5151]. connection pool size is not set in RemoteInterpreterEventClient
    
    ### What is this PR for?
    
    The connection pool size property is only set in zeppelin-server side, but not in interpreter process side. This PR fix this issue via recreating RemoteInterpreterEventClient after it get the zeppelin site configuration in the init method.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/projects/ZEPPELIN/issues/ZEPPELIN-5151
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3988 from zjffdu/ZEPPELIN-5151 and squashes the following commits:
    
    1b49c6f9c [Jeff Zhang] [ZEPPELIN-5151]. connection pool size is not set in RemoteInterpreterEventClient
---
 .../remote/RemoteInterpreterEventClient.java       |  4 ++--
 .../remote/RemoteInterpreterServer.java            | 18 ++++++++++++------
 .../interpreter/InterpreterSettingManager.java     | 22 ++++++++++++----------
 3 files changed, 26 insertions(+), 18 deletions(-)

diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index 3eed43d..c2f38a3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -64,7 +64,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
   private PooledRemoteClient<RemoteInterpreterEventService.Client> remoteClient;
   private String intpGroupId;
 
-  public RemoteInterpreterEventClient(String intpEventHost, int intpEventPort) {
+  public RemoteInterpreterEventClient(String intpEventHost, int intpEventPort, int connectionPoolSize) {
     this.remoteClient = new PooledRemoteClient<>(() -> {
       TSocket transport = new TSocket(intpEventHost, intpEventPort);
       try {
@@ -74,7 +74,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
       }
       TProtocol protocol = new TBinaryProtocol(transport);
       return new RemoteInterpreterEventService.Client(protocol);
-    });
+    }, connectionPoolSize);
   }
 
   public <R> R callRemoteFunction(PooledRemoteClient.RemoteFunction<R, RemoteInterpreterEventService.Client> func) {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 453a7e3..6b1e5d4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -166,11 +166,6 @@ public class RemoteInterpreterServer extends Thread
       this.intpEventServerPort = intpEventServerPort;
       this.port = RemoteInterpreterUtils.findAvailablePort(portRange);
       this.host = RemoteInterpreterUtils.findAvailableHostAddress();
-      if (!isTest) {
-        LOGGER.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port,
-          intpEventServerHost, intpEventServerPort);
-        intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort);
-      }
     } else {
       // DevInterpreter
       this.port = intpEventServerPort;
@@ -227,6 +222,15 @@ public class RemoteInterpreterServer extends Thread
     } catch (Exception e) {
       throw new TException("Fail to create LifeCycleManager", e);
     }
+
+    if (!isTest) {
+      int connectionPoolSize =
+              this.zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE);
+      LOGGER.info("Creating RemoteInterpreterEventClient with connection pool size: {}",
+              connectionPoolSize);
+      intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort,
+              connectionPoolSize);
+    }
   }
 
   @Override
@@ -496,7 +500,8 @@ public class RemoteInterpreterServer extends Thread
       LOGGER.info("Reconnect to this interpreter process from {}:{}", host, port);
       this.intpEventServerHost = host;
       this.intpEventServerPort = port;
-      intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort);
+      intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort,
+              this.zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE));
       intpEventClient.setIntpGroupId(interpreterGroupId);
 
       this.angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient);
@@ -600,6 +605,7 @@ public class RemoteInterpreterServer extends Thread
       if (!Thread.currentThread().isInterrupted()) {
         RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
         try {
+          intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort, 10);
           LOGGER.info("Registering interpreter process");
           intpEventClient.registerInterpreterProcess(registerInfo);
           LOGGER.info("Registered interpreter process");
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index e578dfc..0a5241d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -1107,16 +1107,18 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
   @Override
   public void onNoteRemove(Note note, AuthenticationInfo subject) throws IOException {
     // stop all associated interpreters
-    for (Paragraph paragraph : note.getParagraphs()) {
-      try {
-        Interpreter interpreter = paragraph.getBindedInterpreter();
-        InterpreterSetting interpreterSetting =
-                ((ManagedInterpreterGroup) interpreter.getInterpreterGroup()).getInterpreterSetting();
-        restart(interpreterSetting.getId(), subject.getUser(), note.getId());
-      } catch (InterpreterNotFoundException e) {
-
-      } catch (InterpreterException e) {
-        LOGGER.warn("Fail to stop interpreter setting", e);
+    if (note.getParagraphs() != null) {
+      for (Paragraph paragraph : note.getParagraphs()) {
+        try {
+          Interpreter interpreter = paragraph.getBindedInterpreter();
+          InterpreterSetting interpreterSetting =
+                  ((ManagedInterpreterGroup) interpreter.getInterpreterGroup()).getInterpreterSetting();
+          restart(interpreterSetting.getId(), subject.getUser(), note.getId());
+        } catch (InterpreterNotFoundException e) {
+
+        } catch (InterpreterException e) {
+          LOGGER.warn("Fail to stop interpreter setting", e);
+        }
       }
     }