You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by GitBox <gi...@apache.org> on 2020/09/25 13:52:56 UTC

[GitHub] [zeppelin] Reamer opened a new pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Reamer opened a new pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925


   ### What is this PR for?
   These PR touch the start and stop procedures of all interpreters.
    - improved start script with [shellcheck](https://www.shellcheck.net/) recommendations
    - Use `exec [...]` instead of `eval [..] &`, which means that the Java interpreter process is not a fork of the shell script
      -> no trap handling required in the start script
      -> signals land in the JVM
    - remove anonymous threads in 'RemoteInterpreterServer.java and give the thread nice names
    - Use TServer's StopTimeoutVal and stopTimeoutUnit for faster shutdown
    - remove special K8s shutdown handling
   
   ### What type of PR is it?
    - Improvement 
   
   ### Todos
   * [ ] - Task
   
   ### What is the Jira issue?
   * https://issues.apache.org/jira/browse/ZEPPELIN-5070
   
   ### How should this be tested?
   * Travis-CI: https://travis-ci.org/github/Reamer/zeppelin/builds/730239380
   
   ### Questions:
   * Does the licenses files need update? No
   * Is there breaking changes for older versions? No
   * Does this needs documentation? No
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on a change in pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on a change in pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#discussion_r516099691



##########
File path: zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
##########
@@ -171,7 +171,7 @@ public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec
       sparkConfBuilder.append(" --proxy-user " + context.getUserName());
     }
 
-    env.put("ZEPPELIN_SPARK_CONF", escapeSpecialCharacter(sparkConfBuilder.toString()));
+    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());

Review comment:
       Escaping 'ZEPPELIN_SPARK_CONF' is no longer necessary, because all values are passed to 'exec' as string arguments. The bash escaped string values automatically.
   Because of `exec` the bash process is replaced by the jvm process, so it is not possible to insert further commands after `exec`.
   
   What do you think?

##########
File path: rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java
##########
@@ -221,7 +221,7 @@ public void testInvalidShinyApp()
 
     resultMessages = context2.out.toInterpreterResultMessage();
     assertTrue(resultMessages.get(1).getData(),
-            resultMessages.get(1).getData().contains("object 'Invalid_code' not found"));
+            resultMessages.get(1).getData().contains("'Invalid_code'"));

Review comment:
       My system language is German. When I run tests locally, some tests fail because the JVM translates the error message into German. With commit https://github.com/apache/zeppelin/pull/3925/commits/ea099abef02a0b971c863c1bb1b70aa149bb74fc I have removed the part that is translated by the JVM.

##########
File path: python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
##########
@@ -475,7 +475,7 @@ public void testIPythonFailToLaunch() throws InterpreterException {
       fail("Should not be able to start IPythonInterpreter");
     } catch (InterpreterException e) {
       String exceptionMsg = ExceptionUtils.getStackTrace(e);
-      assertTrue(exceptionMsg, exceptionMsg.contains("No such file or directory"));
+      assertTrue(exceptionMsg, exceptionMsg.contains("java.io.IOException"));

Review comment:
       My system language is German. When I run tests locally, some tests fail because the JVM translates the error message into German. With commit https://github.com/apache/zeppelin/pull/3925/commits/ea099abef02a0b971c863c1bb1b70aa149bb74fc I have removed the part that is translated by the JVM.

##########
File path: zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
##########
@@ -1232,7 +1234,7 @@ public void testPerSessionInterpreterCloseOnNoteRemoval() throws IOException, In
       setting.getOption().setPerNote(setting.getOption().SCOPED);
       notebook.getInterpreterSettingManager().restart(setting.getId());
     }
-
+    Thread.sleep(2000);

Review comment:
       I add this lines during debugging because of failed tests. But this was before adding this [magic line](https://github.com/apache/zeppelin/pull/3925/files#diff-87d8954289365d4fc9ab53cd9b2257ac28cbdef284ec83565e4d5bf9bafc6eedR248), which removes the shutdownhook thread. I'll try to revert the sleep statements.
   
   I added these lines during debugging because of failed tests. But that was before adding this [magic line](https://github.com/apache/zeppelin/pull/3925/files#diff-87d8954289365d4fc9ab53cd9b2257ac28cbdef284ec83565e4d5bf9bafc6eedR248) that removes the shutdownhook thread in case of a regular shutdown. I will try to remove the sleep statements.

##########
File path: zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
##########
@@ -1232,7 +1234,7 @@ public void testPerSessionInterpreterCloseOnNoteRemoval() throws IOException, In
       setting.getOption().setPerNote(setting.getOption().SCOPED);
       notebook.getInterpreterSettingManager().restart(setting.getId());
     }
-
+    Thread.sleep(2000);

Review comment:
       I added these lines during debugging because of failed tests. But that was before adding this [magic line](https://github.com/apache/zeppelin/pull/3925/files#diff-87d8954289365d4fc9ab53cd9b2257ac28cbdef284ec83565e4d5bf9bafc6eedR248) that removes the shutdownhook thread in case of a regular shutdown. I will try to remove the sleep statements.

##########
File path: zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
##########
@@ -1232,7 +1234,7 @@ public void testPerSessionInterpreterCloseOnNoteRemoval() throws IOException, In
       setting.getOption().setPerNote(setting.getOption().SCOPED);
       notebook.getInterpreterSettingManager().restart(setting.getId());
     }
-
+    Thread.sleep(2000);

Review comment:
       I have removed the Sleep Statements.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
zjffdu commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-705950569


   @Reamer I have revert the thrift shade, you can rebase and try it again


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-721134454


   Rebased to the current master to get the [Python hotfix](https://github.com/apache/zeppelin/commit/40882956396660bf213dbb5757120a2010c3c04a)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-720642418


   @zjffdu Thank you very much for your review. I have changed your proposal or answered your question. Please check this PR again before it can be merged safely.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on a change in pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#discussion_r495516862



##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
##########
@@ -132,7 +129,7 @@
 
   private Map<String, Object> remoteWorksResponsePool;
 
-  private static final long DEFAULT_SHUTDOWN_TIMEOUT = 2000;
+  private static final int DEFAULT_SHUTDOWN_TIMEOUT = 2000;

Review comment:
       We can move the static fields ahead. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-715112898


   Finally I am finished with this function.
   @zjffdu Can you please review again?
   
   PS: Next week I will be on holiday, so I cannot answer.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
zjffdu commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-699573811


   Thanks @Reamer Just one minor comment, otherwise LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-705960179


   > @Reamer I have revert the thrift shade, you can rebase and try it again
   
   Thank you. I will split this PR because it is too big.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on a change in pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on a change in pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#discussion_r517433947



##########
File path: python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
##########
@@ -475,7 +475,7 @@ public void testIPythonFailToLaunch() throws InterpreterException {
       fail("Should not be able to start IPythonInterpreter");
     } catch (InterpreterException e) {
       String exceptionMsg = ExceptionUtils.getStackTrace(e);
-      assertTrue(exceptionMsg, exceptionMsg.contains("No such file or directory"));
+      assertTrue(exceptionMsg, exceptionMsg.contains("java.io.IOException"));

Review comment:
       I was looking for a solution, but unfortunately I did not find a suitable one. Since this change is only an accessory, I will remove it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] saLeox commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
saLeox commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-1075893018


   @Reamer @zjffdu Hi, sorry to comment here after merging for long time.
   For the second improvement
   ```
   Use exec [...] instead of eval [..] &, which means that the Java interpreter process is not a fork of the shell script
   -> no trap handling required in the start script
   -> signals land in the JVM
   ```
   I encounter an issue in version 0.10 for python interpreter and report to you here:
   Given that 
   ```
   export ZEPPELIN_IMPERSONATE_CMD='sudo -H -u ${ZEPPELIN_IMPERSONATE_USER} bash -c '
   ```
   and choose **User Impersonate** for python interpreter
   After running paragraph, it will get some error as below:
   ```
   [INFO] Interpreter launch command: sudo -H -u suns bash -c source /home/zeppelin/zeppelin/conf/zeppelin-env.sh; java -Dfile.encoding=UTF-8 -Dlog4j.configuration=file:///home/zeppelin/zeppelin/conf/log4j.properties -Dlog4j.configurationFile=file:///home/zeppelin/zeppelin/conf/log4j2.properties -Dzeppelin.log.file=/home/zeppelin/zeppelin/logs/zeppelin-interpreter-python-suns-suns-zeppelin-data-zeppelin00.log -Xms2048m -Xmx2048m -XX:MaxPermSize=512m -cp :/home/zeppelin/zeppelin/local-repo/python/*:/home/zeppelin/zeppelin/interpreter/python/*:::/home/zeppelin/zeppelin/interpreter/zeppelin-interpreter-shaded-0.11.0-SNAPSHOT.jar org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer 10.71.27.67 23456 python-suns :
   /home/zeppelin/zeppelin/bin/interpreter.sh: line 324: exec: sudo -H -u suns bash -c: not found
   ```
   I change the `exec` back to `eval` only for **Python** interpreter, the problem is gone.
   But the spark interpreter(not sure for the other interpreters) still need to use `exec`, otherwise will get another error.
   So the workaround from my side will looks like as below: 
   ```
   if [[ "${INTERPRETER_ID}" == "python" ]]; then
       exec "${INTERPRETER_RUN_COMMAND[@]}"
   else
       exec "${INTERPRETER_RUN_COMMAND[@]}"
   fi
   ```
   
   Is it possible to take a look and figure out the solution?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-721808252


   If there are no further comments, I will merge this into Master and Branch-0.9 on Friday (06.11.2020).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on a change in pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on a change in pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#discussion_r499720708



##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
##########
@@ -132,7 +129,7 @@
 
   private Map<String, Object> remoteWorksResponsePool;
 
-  private static final long DEFAULT_SHUTDOWN_TIMEOUT = 2000;
+  private static final int DEFAULT_SHUTDOWN_TIMEOUT = 2000;

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
zjffdu commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-705950569


   @Reamer I have revert the thrift shade, you can rebase and try it again


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] saLeox commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
saLeox commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-1075995398


   @Reamer  Thanks for the sharing and let me know the current situation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-705960179


   > @Reamer I have revert the thrift shade, you can rebase and try it again
   
   Thank you. I will split this PR because it is too big.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on a change in pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on a change in pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#discussion_r501730956



##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);

Review comment:
       Remove or debug

##########
File path: bin/interpreter.sh
##########
@@ -16,7 +16,7 @@
 # limitations under the License.
 #
 
-
+set -x

Review comment:
       Remove

##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
   public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for ParallelScheduler {} with {} maxConcurrency", name, maxConcurrency);
+    if (!schedulers.containsKey(name)) {
+      ParallelScheduler s = new ParallelScheduler(name, maxConcurrency, 5, TimeUnit.SECONDS);
+      LOGGER.info("new ParallelScheduler {}", s);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
-  
+
   public Scheduler createOrGetScheduler(Scheduler scheduler) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(scheduler.getName())) {
-        schedulers.put(scheduler.getName(), scheduler);
-        executor.execute(scheduler);
-      }
-      return schedulers.get(scheduler.getName());
+    LOGGER.info("register new scheduler {}", scheduler.getName());

Review comment:
       remove or debug

##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
   public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for ParallelScheduler {} with {} maxConcurrency", name, maxConcurrency);
+    if (!schedulers.containsKey(name)) {
+      ParallelScheduler s = new ParallelScheduler(name, maxConcurrency, 5, TimeUnit.SECONDS);
+      LOGGER.info("new ParallelScheduler {}", s);

Review comment:
       Remove

##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);

Review comment:
       Remove or Debug

##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
   public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for ParallelScheduler {} with {} maxConcurrency", name, maxConcurrency);

Review comment:
       Remove or Debug




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-700567588


   @zjffdu 
   I have noticed that [hive-exec-2.3.4.jar](https://github.com/apache/zeppelin/blob/2e69f712329ff0ce1af964df08c89328f0be84aa/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java#L80) is used in Flink tests. Hive-exec-2.3.4.jar included [libthrift](https://github.com/apache/hive/blob/rel/release-2.3.4/ql/pom.xml#L300-L304) in version [0.9.3](https://github.com/apache/hive/blob/rel/release-2.3.4/pom.xml#L175).
   
   When I start ZSessionIntegrationTest:testZSession_Flink I get the following error:
   ```
   Exception in thread "main" java.lang.NoSuchMethodError: org.apache.thrift.server.TThreadPoolServer$Args.stopTimeoutUnit(Ljava/util/concurrent/TimeUnit;)Lorg/apache/thrift/server/TThreadPoolServer$Args;
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.<init>(RemoteInterpreterServer.java:192)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.<init>(RemoteInterpreterServer.java:154)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.main(RemoteInterpreterServer.java:266)
   ```
   
   How we can solve this?
   
   I think that https://github.com/apache/zeppelin/commit/8e5e4b5e47a01e90a39551ad0a62267175476268 is related to this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on a change in pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on a change in pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#discussion_r501730956



##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);

Review comment:
       Remove or debug

##########
File path: bin/interpreter.sh
##########
@@ -16,7 +16,7 @@
 # limitations under the License.
 #
 
-
+set -x

Review comment:
       Remove

##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
   public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for ParallelScheduler {} with {} maxConcurrency", name, maxConcurrency);
+    if (!schedulers.containsKey(name)) {
+      ParallelScheduler s = new ParallelScheduler(name, maxConcurrency, 5, TimeUnit.SECONDS);
+      LOGGER.info("new ParallelScheduler {}", s);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
-  
+
   public Scheduler createOrGetScheduler(Scheduler scheduler) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(scheduler.getName())) {
-        schedulers.put(scheduler.getName(), scheduler);
-        executor.execute(scheduler);
-      }
-      return schedulers.get(scheduler.getName());
+    LOGGER.info("register new scheduler {}", scheduler.getName());

Review comment:
       remove or debug

##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
   public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for ParallelScheduler {} with {} maxConcurrency", name, maxConcurrency);
+    if (!schedulers.containsKey(name)) {
+      ParallelScheduler s = new ParallelScheduler(name, maxConcurrency, 5, TimeUnit.SECONDS);
+      LOGGER.info("new ParallelScheduler {}", s);

Review comment:
       Remove

##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);

Review comment:
       Remove or Debug

##########
File path: zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
   public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for ParallelScheduler {} with {} maxConcurrency", name, maxConcurrency);

Review comment:
       Remove or Debug




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on a change in pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#discussion_r517381359



##########
File path: python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
##########
@@ -475,7 +475,7 @@ public void testIPythonFailToLaunch() throws InterpreterException {
       fail("Should not be able to start IPythonInterpreter");
     } catch (InterpreterException e) {
       String exceptionMsg = ExceptionUtils.getStackTrace(e);
-      assertTrue(exceptionMsg, exceptionMsg.contains("No such file or directory"));
+      assertTrue(exceptionMsg, exceptionMsg.contains("java.io.IOException"));

Review comment:
       I still think it is better to check the exact error message instead of a general IOException. I believe there should be a setting to turn off the error message translation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
zjffdu commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-704776702


   @Reamer It looks like shade thrift don't have side effect, let me revert it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-1075978893


   The user impersonifiction via sudo is very messy. I recommend you not to use this.
   If you can't do without it, you might find an answer in the branch of @vprus. Take a look at the comments in the commit.
   https://github.com/apache/zeppelin/commit/e4eaa5ee974465b24c2f3bf89ce14040d3559f7b#r48075769


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] Reamer commented on pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
Reamer commented on pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#issuecomment-703739551


   Ping @zjffdu Any Ideas regarding https://github.com/apache/zeppelin/pull/3925#issuecomment-700567588


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] asfgit closed pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on a change in pull request #3925: [ZEPPELIN-5070] Improve start/shutdown and signal handling

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#discussion_r517377022



##########
File path: zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
##########
@@ -171,7 +171,7 @@ public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec
       sparkConfBuilder.append(" --proxy-user " + context.getUserName());
     }
 
-    env.put("ZEPPELIN_SPARK_CONF", escapeSpecialCharacter(sparkConfBuilder.toString()));
+    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());

Review comment:
       make sense. Thanks for explanation. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org