You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/12/17 15:19:14 UTC
[zeppelin] branch master updated: [ZEPPELIN-5151]. connection pool
size is not set in RemoteInterpreterEventClient
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 04d86db [ZEPPELIN-5151]. connection pool size is not set in RemoteInterpreterEventClient
04d86db is described below
commit 04d86db047d46da37a6310969cf008c34ceb0f23
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Dec 7 10:28:57 2020 +0800
[ZEPPELIN-5151]. connection pool size is not set in RemoteInterpreterEventClient
### What is this PR for?
The connection pool size property is only set in zeppelin-server side, but not in interpreter process side. This PR fix this issue via recreating RemoteInterpreterEventClient after it get the zeppelin site configuration in the init method.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/projects/ZEPPELIN/issues/ZEPPELIN-5151
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3988 from zjffdu/ZEPPELIN-5151 and squashes the following commits:
1b49c6f9c [Jeff Zhang] [ZEPPELIN-5151]. connection pool size is not set in RemoteInterpreterEventClient
---
.../remote/RemoteInterpreterEventClient.java | 4 ++--
.../remote/RemoteInterpreterServer.java | 18 ++++++++++++------
.../interpreter/InterpreterSettingManager.java | 22 ++++++++++++----------
3 files changed, 26 insertions(+), 18 deletions(-)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index 3eed43d..c2f38a3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -64,7 +64,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
private PooledRemoteClient<RemoteInterpreterEventService.Client> remoteClient;
private String intpGroupId;
- public RemoteInterpreterEventClient(String intpEventHost, int intpEventPort) {
+ public RemoteInterpreterEventClient(String intpEventHost, int intpEventPort, int connectionPoolSize) {
this.remoteClient = new PooledRemoteClient<>(() -> {
TSocket transport = new TSocket(intpEventHost, intpEventPort);
try {
@@ -74,7 +74,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
}
TProtocol protocol = new TBinaryProtocol(transport);
return new RemoteInterpreterEventService.Client(protocol);
- });
+ }, connectionPoolSize);
}
public <R> R callRemoteFunction(PooledRemoteClient.RemoteFunction<R, RemoteInterpreterEventService.Client> func) {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 453a7e3..6b1e5d4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -166,11 +166,6 @@ public class RemoteInterpreterServer extends Thread
this.intpEventServerPort = intpEventServerPort;
this.port = RemoteInterpreterUtils.findAvailablePort(portRange);
this.host = RemoteInterpreterUtils.findAvailableHostAddress();
- if (!isTest) {
- LOGGER.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port,
- intpEventServerHost, intpEventServerPort);
- intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort);
- }
} else {
// DevInterpreter
this.port = intpEventServerPort;
@@ -227,6 +222,15 @@ public class RemoteInterpreterServer extends Thread
} catch (Exception e) {
throw new TException("Fail to create LifeCycleManager", e);
}
+
+ if (!isTest) {
+ int connectionPoolSize =
+ this.zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE);
+ LOGGER.info("Creating RemoteInterpreterEventClient with connection pool size: {}",
+ connectionPoolSize);
+ intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort,
+ connectionPoolSize);
+ }
}
@Override
@@ -496,7 +500,8 @@ public class RemoteInterpreterServer extends Thread
LOGGER.info("Reconnect to this interpreter process from {}:{}", host, port);
this.intpEventServerHost = host;
this.intpEventServerPort = port;
- intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort);
+ intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort,
+ this.zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE));
intpEventClient.setIntpGroupId(interpreterGroupId);
this.angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient);
@@ -600,6 +605,7 @@ public class RemoteInterpreterServer extends Thread
if (!Thread.currentThread().isInterrupted()) {
RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
try {
+ intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort, 10);
LOGGER.info("Registering interpreter process");
intpEventClient.registerInterpreterProcess(registerInfo);
LOGGER.info("Registered interpreter process");
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index e578dfc..0a5241d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -1107,16 +1107,18 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
@Override
public void onNoteRemove(Note note, AuthenticationInfo subject) throws IOException {
// stop all associated interpreters
- for (Paragraph paragraph : note.getParagraphs()) {
- try {
- Interpreter interpreter = paragraph.getBindedInterpreter();
- InterpreterSetting interpreterSetting =
- ((ManagedInterpreterGroup) interpreter.getInterpreterGroup()).getInterpreterSetting();
- restart(interpreterSetting.getId(), subject.getUser(), note.getId());
- } catch (InterpreterNotFoundException e) {
-
- } catch (InterpreterException e) {
- LOGGER.warn("Fail to stop interpreter setting", e);
+ if (note.getParagraphs() != null) {
+ for (Paragraph paragraph : note.getParagraphs()) {
+ try {
+ Interpreter interpreter = paragraph.getBindedInterpreter();
+ InterpreterSetting interpreterSetting =
+ ((ManagedInterpreterGroup) interpreter.getInterpreterGroup()).getInterpreterSetting();
+ restart(interpreterSetting.getId(), subject.getUser(), note.getId());
+ } catch (InterpreterNotFoundException e) {
+
+ } catch (InterpreterException e) {
+ LOGGER.warn("Fail to stop interpreter setting", e);
+ }
}
}