You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/11/26 07:10:31 UTC

zeppelin git commit: [ZEPPELIN-3869] Close interpreters in multithreaded mode

Repository: zeppelin
Updated Branches:
  refs/heads/master acd75b136 -> 04b726a0a


[ZEPPELIN-3869] Close interpreters in multithreaded mode

### What is this PR for?

This PR provides multithreading realization of interpreters closing.

Benchmark based on [JMH](http://openjdk.java.net/projects/code-tools/jmh/) - `ZeppelinServerBenchmarkTest`:
* Creates `InterpreterSettingManager` and `InterpreterFactory`;
* Runs 6 interpreters (`python`, `sh`, `md`, `groovy`, `angular`, `spark`) for each user in isolated mode;
* Measures InterpreterSettingManager closing.

Result with usersCnt - 7, 5, 3, 1 (42, 30, 18, 6 interpreter processes):
```
Benchmark                                                 (usersCnt)  Mode  Cnt   Score   Error  Units
ZeppelinServerBenchmarkTest.testDefaultServerShutdown              7  avgt   10  16,690 ± 0,191   s/op
ZeppelinServerBenchmarkTest.testDefaultServerShutdown              5  avgt   10  11,532 ± 0,064   s/op
ZeppelinServerBenchmarkTest.testDefaultServerShutdown              3  avgt   10   6,942 ± 0,020   s/op
ZeppelinServerBenchmarkTest.testDefaultServerShutdown              1  avgt   10   2,348 ± 0,018   s/op
ZeppelinServerBenchmarkTest.testServerShutdownInParallel           7  avgt   10   3,612 ± 0,317   s/op
ZeppelinServerBenchmarkTest.testServerShutdownInParallel           5  avgt   10   2,635 ± 0,075   s/op
ZeppelinServerBenchmarkTest.testServerShutdownInParallel           3  avgt   10   2,473 ± 0,072   s/op
ZeppelinServerBenchmarkTest.testServerShutdownInParallel           1  avgt   10   2,378 ± 0,029   s/op
```
[benchmark.zip](https://github.com/TinkoffCreditSystems/zeppelin/files/2591694/benchmark.zip) contains:
* `Benchmark.patch` - patch with `ZeppelinServerBenchmarkTest`;
* `benchmark.csv` - benchmark result;
![jmh-benchmark](https://user-images.githubusercontent.com/6136993/48660410-06706280-ea72-11e8-805f-fe5379f03131.png)
* `benchmark-result.json` - results (you could visualize it using [JMH Visualizer](http://jmh.morethan.io/))
![zp-36](https://user-images.githubusercontent.com/6136993/48660111-5b5daa00-ea6d-11e8-8f13-0df27d5a767d.png)

### How to run benchmark
1. Apply patch from `benchmark.zip`
2. Add [IntelliJ IDEA JMH Plugin](https://plugins.jetbrains.com/plugin/7529-jmh-plugin)
3. Build Zeppelin
4. Run ZeppelinServerBenchmarkTest in IntelliJ IDEA

### What type of PR is it?
Refactoring

### What is the Jira issue?
[[ZEPPELIN-3869]](https://issues.apache.org/jira/browse/ZEPPELIN-3869)

### How should this be tested?
* CI pass

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: George Klimov <kl...@gmail.com>

Closes #3229 from egorklimov/ZP-71 and squashes the following commits:

1cad17a0e [George Klimov] Remove watchdog check
e7abdd305 [George Klimov] Fix issues
4bab30f76 [George Klimov] Add exceptions
380f539f1 [George Klimov] Refactor code
efacb7717 [George Klimov] Fix code style issues
becfdf15f [George Klimov] Parallel close


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/04b726a0
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/04b726a0
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/04b726a0

Branch: refs/heads/master
Commit: 04b726a0a2935475d26816f99dab6e778e96b5dd
Parents: acd75b1
Author: George Klimov <kl...@gmail.com>
Authored: Wed Nov 14 14:13:22 2018 +0300
Committer: jeffzhang.zjf <je...@alibaba-inc.com>
Committed: Mon Nov 26 15:10:17 2018 +0800

----------------------------------------------------------------------
 .../interpreter/InterpreterSetting.java         | 19 +++++--
 .../interpreter/InterpreterSettingManager.java  | 23 ++++----
 .../interpreter/ManagedInterpreterGroup.java    | 57 +++++++++++++-------
 3 files changed, 64 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/04b726a0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 1fc231c..4411674 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -58,6 +58,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE;
 import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
@@ -477,10 +478,22 @@ public class InterpreterSetting {
 
   public void close() {
     LOGGER.info("Close InterpreterSetting: " + name);
-    for (ManagedInterpreterGroup intpGroup : interpreterGroups.values()) {
-      intpGroup.close();
-    }
+    List<Thread> closeThreads = interpreterGroups.values().stream()
+            .map(g -> new Thread(g::close, name + "-close"))
+            .peek(t -> t.setUncaughtExceptionHandler((th, e) ->
+                    LOGGER.error("InterpreterSetting close error", e)))
+            .peek(Thread::start)
+            .collect(Collectors.toList());
     interpreterGroups.clear();
+    for (Thread t : closeThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        LOGGER.error("Can't wait InterpreterSetting close threads", e);
+        Thread.currentThread().interrupt();
+        break;
+      }
+    }
   }
 
   public void setProperties(Object object) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/04b726a0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 8a1ccb0..be6e8f4 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -76,10 +76,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 
 /**
@@ -850,23 +850,20 @@ public class InterpreterSettingManager implements InterpreterSettingManagerMBean
   }
 
   public void close() {
-    List<Thread> closeThreads = new LinkedList<>();
-    for (final InterpreterSetting intpSetting : interpreterSettings.values()) {
-      Thread t =
-          new Thread() {
-            public void run() {
-              intpSetting.close();
-            }
-          };
-      t.start();
-      closeThreads.add(t);
-    }
+    List<Thread> closeThreads = interpreterSettings.values().stream()
+            .map(intpSetting-> new Thread(intpSetting::close, intpSetting.getId() + "-close"))
+            .peek(t -> t.setUncaughtExceptionHandler((th, e) ->
+                    LOGGER.error("interpreterGroup close error", e)))
+            .peek(Thread::start)
+            .collect(Collectors.toList());
 
     for (Thread t : closeThreads) {
       try {
         t.join();
       } catch (InterruptedException e) {
-        LOGGER.error("Can't close interpreterGroup", e);
+        LOGGER.error("Can't wait close interpreterGroup threads", e);
+        Thread.currentThread().interrupt();
+        break;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/04b726a0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index faa77f1..9454988 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -18,7 +18,6 @@
 
 package org.apache.zeppelin.interpreter;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Scheduler;
@@ -30,6 +29,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 /**
  * ManagedInterpreterGroup runs under zeppelin server
@@ -82,7 +82,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
   /**
    * Close all interpreter instances in this group
    */
-  public synchronized void close() {
+  public void close() {
     LOGGER.info("Close InterpreterGroup: " + id);
     for (String sessionId : sessions.keySet()) {
       close(sessionId);
@@ -95,7 +95,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
    */
   public synchronized void close(String sessionId) {
     LOGGER.info("Close Session: " + sessionId + " for interpreter setting: " +
-        interpreterSetting.getName());
+            interpreterSetting.getName());
     close(sessions.remove(sessionId));
     //TODO(zjffdu) whether close InterpreterGroup if there's no session left in Zeppelin Server
     if (sessions.isEmpty() && interpreterSetting != null) {
@@ -118,27 +118,46 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
     if (interpreters == null) {
       return;
     }
-
-    for (Interpreter interpreter : interpreters) {
-      Scheduler scheduler = interpreter.getScheduler();
-      for (Job job : scheduler.getAllJobs()) {
-        job.abort();
-        job.setStatus(Job.Status.ABORT);
-        LOGGER.info("Job " + job.getJobName() + " aborted ");
-      }
-
+    List<Thread> closeThreads = interpreters.stream()
+            .map(interpreter -> new Thread(() ->
+                    closeInterpreter(interpreter),
+                    interpreter.getClass().getSimpleName() + "-close"))
+            .peek(t -> t.setUncaughtExceptionHandler((th, e) ->
+                    LOGGER.error("Interpreter close error", e)))
+            .peek(Thread::start)
+            .collect(Collectors.toList());
+
+    for (Thread t : closeThreads) {
       try {
-        interpreter.close();
-      } catch (InterpreterException e) {
-        LOGGER.warn("Fail to close interpreter " + interpreter.getClassName(), e);
-      }
-      //TODO(zjffdu) move the close of schedule to Interpreter
-      if (null != scheduler) {
-        SchedulerFactory.singleton().removeScheduler(scheduler.getName());
+        t.join();
+      } catch (InterruptedException e) {
+        LOGGER.error("Can't wait interpreter close threads", e);
+        Thread.currentThread().interrupt();
+        break;
       }
     }
   }
 
+  private void closeInterpreter(Interpreter interpreter) {
+    Scheduler scheduler = interpreter.getScheduler();
+
+    for (final Job job : scheduler.getAllJobs()) {
+      job.abort();
+      job.setStatus(Job.Status.ABORT);
+      LOGGER.info("Job " + job.getJobName() + " aborted ");
+    }
+
+    try {
+      LOGGER.info("Trying to close interpreter " + interpreter.getClassName());
+      interpreter.close();
+    } catch (InterpreterException e) {
+      LOGGER.warn("Fail to close interpreter " + interpreter.getClassName(), e);
+    }
+
+    //TODO(zjffdu) move the close of schedule to Interpreter
+    SchedulerFactory.singleton().removeScheduler(scheduler.getName());
+  }
+
   public synchronized List<Interpreter> getOrCreateSession(String user, String sessionId) {
     if (sessions.containsKey(sessionId)) {
       return sessions.get(sessionId);