You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/11/26 07:10:31 UTC
zeppelin git commit: [ZEPPELIN-3869] Close interpreters in
multithreaded mode
Repository: zeppelin
Updated Branches:
refs/heads/master acd75b136 -> 04b726a0a
[ZEPPELIN-3869] Close interpreters in multithreaded mode
### What is this PR for?
This PR provides multithreading realization of interpreters closing.
Benchmark based on [JMH](http://openjdk.java.net/projects/code-tools/jmh/) - `ZeppelinServerBenchmarkTest`:
* Creates `InterpreterSettingManager` and `InterpreterFactory`;
* Runs 6 interpreters (`python`, `sh`, `md`, `groovy`, `angular`, `spark`) for each user in isolated mode;
* Measures InterpreterSettingManager closing.
Result with usersCnt - 7, 5, 3, 1 (42, 30, 18, 6 interpreter processes):
```
Benchmark (usersCnt) Mode Cnt Score Error Units
ZeppelinServerBenchmarkTest.testDefaultServerShutdown 7 avgt 10 16,690 ± 0,191 s/op
ZeppelinServerBenchmarkTest.testDefaultServerShutdown 5 avgt 10 11,532 ± 0,064 s/op
ZeppelinServerBenchmarkTest.testDefaultServerShutdown 3 avgt 10 6,942 ± 0,020 s/op
ZeppelinServerBenchmarkTest.testDefaultServerShutdown 1 avgt 10 2,348 ± 0,018 s/op
ZeppelinServerBenchmarkTest.testServerShutdownInParallel 7 avgt 10 3,612 ± 0,317 s/op
ZeppelinServerBenchmarkTest.testServerShutdownInParallel 5 avgt 10 2,635 ± 0,075 s/op
ZeppelinServerBenchmarkTest.testServerShutdownInParallel 3 avgt 10 2,473 ± 0,072 s/op
ZeppelinServerBenchmarkTest.testServerShutdownInParallel 1 avgt 10 2,378 ± 0,029 s/op
```
[benchmark.zip](https://github.com/TinkoffCreditSystems/zeppelin/files/2591694/benchmark.zip) contains:
* `Benchmark.patch` - patch with `ZeppelinServerBenchmarkTest`;
* `benchmark.csv` - benchmark result;
![jmh-benchmark](https://user-images.githubusercontent.com/6136993/48660410-06706280-ea72-11e8-805f-fe5379f03131.png)
* `benchmark-result.json` - results (you could visualize it using [JMH Visualizer](http://jmh.morethan.io/))
![zp-36](https://user-images.githubusercontent.com/6136993/48660111-5b5daa00-ea6d-11e8-8f13-0df27d5a767d.png)
### How to run benchmark
1. Apply patch from `benchmark.zip`
2. Add [IntelliJ IDEA JMH Plugin](https://plugins.jetbrains.com/plugin/7529-jmh-plugin)
3. Build Zeppelin
4. Run ZeppelinServerBenchmarkTest in IntelliJ IDEA
### What type of PR is it?
Refactoring
### What is the Jira issue?
[[ZEPPELIN-3869]](https://issues.apache.org/jira/browse/ZEPPELIN-3869)
### How should this be tested?
* CI pass
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: George Klimov <kl...@gmail.com>
Closes #3229 from egorklimov/ZP-71 and squashes the following commits:
1cad17a0e [George Klimov] Remove watchdog check
e7abdd305 [George Klimov] Fix issues
4bab30f76 [George Klimov] Add exceptions
380f539f1 [George Klimov] Refactor code
efacb7717 [George Klimov] Fix code style issues
becfdf15f [George Klimov] Parallel close
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/04b726a0
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/04b726a0
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/04b726a0
Branch: refs/heads/master
Commit: 04b726a0a2935475d26816f99dab6e778e96b5dd
Parents: acd75b1
Author: George Klimov <kl...@gmail.com>
Authored: Wed Nov 14 14:13:22 2018 +0300
Committer: jeffzhang.zjf <je...@alibaba-inc.com>
Committed: Mon Nov 26 15:10:17 2018 +0800
----------------------------------------------------------------------
.../interpreter/InterpreterSetting.java | 19 +++++--
.../interpreter/InterpreterSettingManager.java | 23 ++++----
.../interpreter/ManagedInterpreterGroup.java | 57 +++++++++++++-------
3 files changed, 64 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/04b726a0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 1fc231c..4411674 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -58,6 +58,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE;
import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
@@ -477,10 +478,22 @@ public class InterpreterSetting {
public void close() {
LOGGER.info("Close InterpreterSetting: " + name);
- for (ManagedInterpreterGroup intpGroup : interpreterGroups.values()) {
- intpGroup.close();
- }
+ List<Thread> closeThreads = interpreterGroups.values().stream()
+ .map(g -> new Thread(g::close, name + "-close"))
+ .peek(t -> t.setUncaughtExceptionHandler((th, e) ->
+ LOGGER.error("InterpreterSetting close error", e)))
+ .peek(Thread::start)
+ .collect(Collectors.toList());
interpreterGroups.clear();
+ for (Thread t : closeThreads) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ LOGGER.error("Can't wait InterpreterSetting close threads", e);
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
}
public void setProperties(Object object) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/04b726a0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 8a1ccb0..be6e8f4 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -76,10 +76,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
@@ -850,23 +850,20 @@ public class InterpreterSettingManager implements InterpreterSettingManagerMBean
}
public void close() {
- List<Thread> closeThreads = new LinkedList<>();
- for (final InterpreterSetting intpSetting : interpreterSettings.values()) {
- Thread t =
- new Thread() {
- public void run() {
- intpSetting.close();
- }
- };
- t.start();
- closeThreads.add(t);
- }
+ List<Thread> closeThreads = interpreterSettings.values().stream()
+ .map(intpSetting-> new Thread(intpSetting::close, intpSetting.getId() + "-close"))
+ .peek(t -> t.setUncaughtExceptionHandler((th, e) ->
+ LOGGER.error("interpreterGroup close error", e)))
+ .peek(Thread::start)
+ .collect(Collectors.toList());
for (Thread t : closeThreads) {
try {
t.join();
} catch (InterruptedException e) {
- LOGGER.error("Can't close interpreterGroup", e);
+ LOGGER.error("Can't wait close interpreterGroup threads", e);
+ Thread.currentThread().interrupt();
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/04b726a0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index faa77f1..9454988 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -18,7 +18,6 @@
package org.apache.zeppelin.interpreter;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
@@ -30,6 +29,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
+import java.util.stream.Collectors;
/**
* ManagedInterpreterGroup runs under zeppelin server
@@ -82,7 +82,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
/**
* Close all interpreter instances in this group
*/
- public synchronized void close() {
+ public void close() {
LOGGER.info("Close InterpreterGroup: " + id);
for (String sessionId : sessions.keySet()) {
close(sessionId);
@@ -95,7 +95,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
*/
public synchronized void close(String sessionId) {
LOGGER.info("Close Session: " + sessionId + " for interpreter setting: " +
- interpreterSetting.getName());
+ interpreterSetting.getName());
close(sessions.remove(sessionId));
//TODO(zjffdu) whether close InterpreterGroup if there's no session left in Zeppelin Server
if (sessions.isEmpty() && interpreterSetting != null) {
@@ -118,27 +118,46 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
if (interpreters == null) {
return;
}
-
- for (Interpreter interpreter : interpreters) {
- Scheduler scheduler = interpreter.getScheduler();
- for (Job job : scheduler.getAllJobs()) {
- job.abort();
- job.setStatus(Job.Status.ABORT);
- LOGGER.info("Job " + job.getJobName() + " aborted ");
- }
-
+ List<Thread> closeThreads = interpreters.stream()
+ .map(interpreter -> new Thread(() ->
+ closeInterpreter(interpreter),
+ interpreter.getClass().getSimpleName() + "-close"))
+ .peek(t -> t.setUncaughtExceptionHandler((th, e) ->
+ LOGGER.error("Interpreter close error", e)))
+ .peek(Thread::start)
+ .collect(Collectors.toList());
+
+ for (Thread t : closeThreads) {
try {
- interpreter.close();
- } catch (InterpreterException e) {
- LOGGER.warn("Fail to close interpreter " + interpreter.getClassName(), e);
- }
- //TODO(zjffdu) move the close of schedule to Interpreter
- if (null != scheduler) {
- SchedulerFactory.singleton().removeScheduler(scheduler.getName());
+ t.join();
+ } catch (InterruptedException e) {
+ LOGGER.error("Can't wait interpreter close threads", e);
+ Thread.currentThread().interrupt();
+ break;
}
}
}
+ private void closeInterpreter(Interpreter interpreter) {
+ Scheduler scheduler = interpreter.getScheduler();
+
+ for (final Job job : scheduler.getAllJobs()) {
+ job.abort();
+ job.setStatus(Job.Status.ABORT);
+ LOGGER.info("Job " + job.getJobName() + " aborted ");
+ }
+
+ try {
+ LOGGER.info("Trying to close interpreter " + interpreter.getClassName());
+ interpreter.close();
+ } catch (InterpreterException e) {
+ LOGGER.warn("Fail to close interpreter " + interpreter.getClassName(), e);
+ }
+
+ //TODO(zjffdu) move the close of schedule to Interpreter
+ SchedulerFactory.singleton().removeScheduler(scheduler.getName());
+ }
+
public synchronized List<Interpreter> getOrCreateSession(String user, String sessionId) {
if (sessions.containsKey(sessionId)) {
return sessions.get(sessionId);