You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/28 03:59:42 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4890]. Support to run note in isolated mode in rest api

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 5ab4ffb  [ZEPPELIN-4890]. Support to run note in isolated mode in rest api
5ab4ffb is described below

commit 5ab4ffbf7c36091b8be10657427d63268ed802b4
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Jun 23 14:10:49 2020 +0800

    [ZEPPELIN-4890]. Support to run note in isolated mode in rest api
    
    ### What is this PR for?
    
    This PR is to support to run note in isolated mode in rest api, that means all the interpreter in this rest api is isolated and will be released after the note is finished. This is used for other third party that use rest api to integrate with zeppelin to run note, e.g. airflow use it to run ETL job.
    
    ### What type of PR is it?
    [Improvement | Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4890
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3821 from zjffdu/ZEPPELIN-4890 and squashes the following commits:
    
    76f09955e [Jeff Zhang] [ZEPPELIN-4890]. Support to run note in isolated mode in rest api
    
    (cherry picked from commit 0e47b87d3d4d89c31336c57bdfbdac9294b243a3)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../zeppelin/integration/FlinkIntegrationTest.java |   4 +-
 .../zeppelin/integration/JdbcIntegrationTest.java  |   6 +-
 .../zeppelin/integration/SparkIntegrationTest.java |  12 +-
 .../YarnInterpreterLauncherIntegrationTest.java    |   8 +-
 .../zeppelin/interpreter/ExecutionContext.java     |  33 +++---
 .../interpreter/ExecutionContextBuilder.java       |  58 ++++++++++
 .../remote/RemoteInterpreterServer.java            |   2 +-
 .../apache/zeppelin/scheduler/ExecutorFactory.java |   9 ++
 .../apache/zeppelin/rest/InterpreterRestApi.java   |   4 +-
 .../org/apache/zeppelin/rest/NotebookRestApi.java  |  25 +++-
 .../apache/zeppelin/service/NotebookService.java   |   2 +-
 .../org/apache/zeppelin/recovery/RecoveryTest.java |  12 +-
 .../apache/zeppelin/rest/NotebookRestApiTest.java  | 114 ++++++++++++++++--
 .../apache/zeppelin/rest/ZeppelinRestApiTest.java  |  14 +--
 .../zeppelin/interpreter/InterpreterSetting.java   |  21 ++--
 .../interpreter/InterpreterSettingManager.java     |  11 +-
 .../java/org/apache/zeppelin/notebook/Note.java    | 127 ++++++++++++++++-----
 .../org/apache/zeppelin/notebook/Notebook.java     |   4 +-
 .../org/apache/zeppelin/notebook/Paragraph.java    |  32 ++++--
 .../zeppelin/notebook/scheduler/CronJob.java       |  39 +++----
 .../zeppelin/interpreter/ConfInterpreterTest.java  |   8 +-
 .../interpreter/InterpreterFactoryTest.java        |  24 ++--
 .../interpreter/InterpreterSettingManagerTest.java |   4 +-
 .../lifecycle/TimeoutLifecycleManagerTest.java     |  10 +-
 .../org/apache/zeppelin/notebook/NotebookTest.java |  25 ++--
 25 files changed, 434 insertions(+), 174 deletions(-)

diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index fcd22f6..dc6e562 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -100,7 +100,7 @@ public class FlinkIntegrationTest {
 
   private void testInterpreterBasics() throws IOException, InterpreterException {
     // test FlinkInterpreter
-    Interpreter flinkInterpreter = interpreterFactory.getInterpreter("flink", new ExecutionContext("user1", "note1", "flink"));
+    Interpreter flinkInterpreter = interpreterFactory.getInterpreter("flink", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("flink").createExecutionContext());
 
     InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
     InterpreterResult interpreterResult = flinkInterpreter.interpret("1+1", context);
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java
index 9ec23d2..d9e7d35 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java
@@ -19,7 +19,7 @@ package org.apache.zeppelin.integration;
 
 import com.google.common.collect.Lists;
 import org.apache.zeppelin.dep.Dependency;
-import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -70,7 +70,7 @@ public class JdbcIntegrationTest {
     interpreterSetting.setDependencies(Lists.newArrayList(dependency));
     interpreterSettingManager.restart(interpreterSetting.getId());
     interpreterSetting.waitForReady(60 * 1000);
-    Interpreter jdbcInterpreter = interpreterFactory.getInterpreter("jdbc", new ExecutionContext("user1", "note1", "test"));
+    Interpreter jdbcInterpreter = interpreterFactory.getInterpreter("jdbc", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     assertNotNull("JdbcInterpreter is null", jdbcInterpreter);
 
     InterpreterContext context = new InterpreterContext.Builder()
@@ -89,7 +89,7 @@ public class JdbcIntegrationTest {
     assertEquals("c1\tc2\n1\t2\n", interpreterResult.message().get(0).getData());
 
     // read table_1 from python interpreter
-    Interpreter pythonInterpreter = interpreterFactory.getInterpreter("python", new ExecutionContext("user1", "note1", "test"));
+    Interpreter pythonInterpreter = interpreterFactory.getInterpreter("python", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     assertNotNull("PythonInterpreter is null", pythonInterpreter);
 
     context = new InterpreterContext.Builder()
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index 42f35a6..bb34661 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.maven.model.Model;
 import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
-import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -102,7 +102,7 @@ public abstract class SparkIntegrationTest {
     sparkInterpreterSetting.setProperty("spark.jars", new File("target/zeppelin-interpreter-integration-" + model.getVersion() + ".jar").getAbsolutePath());
 
     // test SparkInterpreter
-    Interpreter sparkInterpreter = interpreterFactory.getInterpreter("spark.spark", new ExecutionContext("user1", "note1", "test"));
+    Interpreter sparkInterpreter = interpreterFactory.getInterpreter("spark.spark", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
 
     InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
     InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context);
@@ -119,24 +119,24 @@ public abstract class SparkIntegrationTest {
     assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
 
     // test PySparkInterpreter
-    Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("spark.pyspark", new ExecutionContext("user1", "note1", "test"));
+    Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("spark.pyspark", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context);
     assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
 
     // test IPySparkInterpreter
-    Interpreter ipySparkInterpreter = interpreterFactory.getInterpreter("spark.ipyspark", new ExecutionContext("user1", "note1", "test"));
+    Interpreter ipySparkInterpreter = interpreterFactory.getInterpreter("spark.ipyspark", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     interpreterResult = ipySparkInterpreter.interpret("sqlContext.table('test').show()", context);
     assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
 
     // test SparkSQLInterpreter
-    Interpreter sqlInterpreter = interpreterFactory.getInterpreter("spark.sql", new ExecutionContext("user1", "note1", "test"));
+    Interpreter sqlInterpreter = interpreterFactory.getInterpreter("spark.sql", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     interpreterResult = sqlInterpreter.interpret("select count(1) as c from test", context);
     assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertEquals(interpreterResult.toString(), InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
     assertEquals(interpreterResult.toString(), "c\n2\n", interpreterResult.message().get(0).getData());
 
     // test SparkRInterpreter
-    Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("spark.r", new ExecutionContext("user1", "note1", "test"));
+    Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("spark.r", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     if (isSpark2() || isSpark3()) {
       interpreterResult = sparkrInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context);
     } else {
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/YarnInterpreterLauncherIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/YarnInterpreterLauncherIntegrationTest.java
index 445ce15..4f1543c 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/YarnInterpreterLauncherIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/YarnInterpreterLauncherIntegrationTest.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.zeppelin.dep.Dependency;
-import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -85,7 +85,7 @@ public class YarnInterpreterLauncherIntegrationTest {
     shellInterpreterSetting.setProperty("zeppelin.interpreter.launcher", "yarn");
     shellInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
 
-    Interpreter shellInterpreter = interpreterFactory.getInterpreter("sh", new ExecutionContext("user1", "note1", "sh"));
+    Interpreter shellInterpreter = interpreterFactory.getInterpreter("sh", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("sh").createExecutionContext());
 
     InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
     InterpreterResult interpreterResult = shellInterpreter.interpret("pwd", context);
@@ -121,7 +121,7 @@ public class YarnInterpreterLauncherIntegrationTest {
     pythonInterpreterSetting.setProperty("zeppelin.interpreter.yarn.resource.memory", "512");
     pythonInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
 
-    Interpreter jdbcInterpreter = interpreterFactory.getInterpreter("jdbc", new ExecutionContext("user1", "note1", "test"));
+    Interpreter jdbcInterpreter = interpreterFactory.getInterpreter("jdbc", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     assertNotNull("JdbcInterpreter is null", jdbcInterpreter);
 
     InterpreterContext context = new InterpreterContext.Builder()
@@ -140,7 +140,7 @@ public class YarnInterpreterLauncherIntegrationTest {
     assertEquals("c1\tc2\n1\t2\n", interpreterResult.message().get(0).getData());
 
     // read table_1 from python interpreter
-    Interpreter pythonInterpreter = interpreterFactory.getInterpreter("python", new ExecutionContext("user1", "note1", "test"));
+    Interpreter pythonInterpreter = interpreterFactory.getInterpreter("python", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     assertNotNull("PythonInterpreter is null", pythonInterpreter);
 
     context = new InterpreterContext.Builder()
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java
index e2bbe45..9474560 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java
@@ -17,26 +17,26 @@
 
 package org.apache.zeppelin.interpreter;
 
+/**
+ * Context info about running interpreter. This is used for deciding which interpreter binding
+ * mode to use.
+ */
 public class ExecutionContext {
 
   private final String user;
   private final String noteId;
   private final String defaultInterpreterGroup;
-  private final boolean inCronMode;
-
-  public ExecutionContext(String user, String noteId) {
-    this(user, noteId, "", false);
-  }
+  private final boolean inIsolatedMode;
+  // When is the execution triggered, e.g. when the cron job is triggered or when the rest api is triggered.
+  private final String startTime;
 
-  public ExecutionContext(String user, String noteId, String defaultInterpreterGroup) {
-    this(user, noteId, defaultInterpreterGroup, false);
-  }
-
-  public ExecutionContext(String user, String noteId, String defaultInterpreterGroup, boolean inCronMode) {
+  public ExecutionContext(String user, String noteId, String defaultInterpreterGroup,
+                          boolean inIsolatedMode, String startTime) {
     this.user = user;
     this.noteId = noteId;
     this.defaultInterpreterGroup = defaultInterpreterGroup;
-    this.inCronMode = inCronMode;
+    this.inIsolatedMode = inIsolatedMode;
+    this.startTime = startTime;
   }
 
   public String getUser() {
@@ -51,8 +51,12 @@ public class ExecutionContext {
     return defaultInterpreterGroup;
   }
 
-  public boolean isInCronMode() {
-    return inCronMode;
+  public boolean isInIsolatedMode() {
+    return inIsolatedMode;
+  }
+
+  public String getStartTime() {
+    return startTime;
   }
 
   @Override
@@ -61,7 +65,8 @@ public class ExecutionContext {
             "user='" + user + '\'' +
             ", noteId='" + noteId + '\'' +
             ", defaultInterpreterGroup='" + defaultInterpreterGroup + '\'' +
-            ", inCronMode=" + inCronMode +
+            ", inIsolatedMode=" + inIsolatedMode +
+            ", startTime=" + startTime +
             '}';
   }
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java
new file mode 100644
index 0000000..bf8be9c
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+/**
+ * Builder class for ExecutionContext.
+ */
+public class ExecutionContextBuilder {
+  private String user;
+  private String noteId;
+  private String defaultInterpreterGroup = "";
+  private boolean inIsolatedMode = false;
+  private String startTime = "";
+
+  public ExecutionContextBuilder setUser(String user) {
+    this.user = user;
+    return this;
+  }
+
+  public ExecutionContextBuilder setNoteId(String noteId) {
+    this.noteId = noteId;
+    return this;
+  }
+
+  public ExecutionContextBuilder setDefaultInterpreterGroup(String defaultInterpreterGroup) {
+    this.defaultInterpreterGroup = defaultInterpreterGroup;
+    return this;
+  }
+
+  public ExecutionContextBuilder setInIsolatedMode(boolean inIsolatedMode) {
+    this.inIsolatedMode = inIsolatedMode;
+    return this;
+  }
+
+  public ExecutionContextBuilder setStartTime(String startTime) {
+    this.startTime = startTime;
+    return this;
+  }
+
+  public ExecutionContext createExecutionContext() {
+    return new ExecutionContext(user, noteId, defaultInterpreterGroup, inIsolatedMode, startTime);
+  }
+}
\ No newline at end of file
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 2d3396a..8953eda 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -243,7 +243,7 @@ public class RemoteInterpreterServer extends Thread
             }
           }
 
-          if ("yarn".endsWith(launcherEnv)) {
+          if (launcherEnv != null && "yarn".endsWith(launcherEnv)) {
             try {
               YarnUtils.register(host, port);
               Thread thread = new Thread(() -> {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
index 02b7e72..70c7e29 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
@@ -55,6 +55,15 @@ public class ExecutorFactory {
     }
   }
 
+  /**
+   * ThreadPool created for running note via rest api.
+   * TODO(zjffdu) Should use property to configure the thread pool size.
+   * @return
+   */
+  public ExecutorService getNoteJobExecutor() {
+    return createOrGet("NoteJobThread-", 50);
+  }
+
   public void shutdown(String name) {
     synchronized (executors) {
       if (executors.containsKey(name)) {
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index bb114c3..f8475d6 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -23,7 +23,7 @@ import javax.inject.Singleton;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.zeppelin.annotation.ZeppelinApi;
 import org.apache.zeppelin.dep.Repository;
-import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterPropertyType;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
@@ -202,7 +202,7 @@ public class InterpreterRestApi {
         interpreterSettingManager.close(settingId);
       } else {
         interpreterSettingManager.restart(settingId,
-                new ExecutionContext(authenticationService.getPrincipal(), noteId));
+                new ExecutionContextBuilder().setUser(authenticationService.getPrincipal()).setNoteId(noteId).createExecutionContext());
       }
 
     } catch (InterpreterException e) {
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index ab72ec4..615b789 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import javax.ws.rs.DELETE;
@@ -42,7 +43,10 @@ import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.annotation.ZeppelinApi;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.NoteInfo;
 import org.apache.zeppelin.notebook.Notebook;
@@ -59,6 +63,7 @@ import org.apache.zeppelin.rest.message.NewParagraphRequest;
 import org.apache.zeppelin.rest.message.RenameNoteRequest;
 import org.apache.zeppelin.rest.message.RunParagraphWithParametersRequest;
 import org.apache.zeppelin.rest.message.UpdateParagraphRequest;
+import org.apache.zeppelin.scheduler.ExecutorFactory;
 import org.apache.zeppelin.search.SearchService;
 import org.apache.zeppelin.server.JsonResponse;
 import org.apache.zeppelin.service.AuthenticationService;
@@ -650,6 +655,8 @@ public class NotebookRestApi extends AbstractRestApi {
    * Run note jobs REST API.
    *
    * @param noteId ID of Note
+   * @param blocking blocking until jobs are done
+   * @param isolated use isolated interpreter for running this note
    * @return JSON with status.OK
    * @throws IOException
    * @throws IllegalArgumentException
@@ -658,23 +665,31 @@ public class NotebookRestApi extends AbstractRestApi {
   @Path("job/{noteId}")
   @ZeppelinApi
   public Response runNoteJobs(@PathParam("noteId") String noteId,
-                              @QueryParam("waitToFinish") Boolean waitToFinish)
+                              @QueryParam("blocking") Boolean blocking,
+                              @QueryParam("isolated") Boolean isolated)
       throws IOException, IllegalArgumentException {
-    boolean blocking = waitToFinish == null || waitToFinish;
-    LOG.info("run note jobs {} waitToFinish: {}", noteId, blocking);
+    if (blocking == null) {
+      blocking = false;
+    }
+    if (isolated == null) {
+      isolated = false;
+    }
+
+    LOG.info("Run note jobs, noteId: {} blocking: {}, isolated: {}", noteId, blocking, isolated);
     Note note = notebook.getNote(noteId);
     AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal());
     subject.setRoles(new LinkedList<>(authenticationService.getAssociatedRoles()));
     checkIfNoteIsNotNull(note);
     checkIfUserCanRun(noteId, "Insufficient privileges you cannot run job for this note");
 
+    //TODO(zjffdu), can we run a note via rest api when cron is enabled ?
     try {
-      note.runAll(subject, blocking);
+      note.runAll(subject, blocking, isolated);
+      return new JsonResponse<>(Status.OK).build();
     } catch (Exception ex) {
       LOG.error("Exception from run", ex);
       return new JsonResponse<>(Status.EXPECTATION_FAILED, ex.getMessage()).build();
     }
-    return new JsonResponse<>(Status.OK).build();
   }
 
   /**
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index bc7788b..1d4493c 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -419,7 +419,7 @@ public class NotebookService {
       } else {
         try {
           // run note directly when parameter `paragraphs` is null.
-          note.runAll(context.getAutheInfo(), true);
+          note.runAll(context.getAutheInfo(), true, false);
           return true;
         } catch (Exception e) {
           LOGGER.warn("Fail to run note: " + note.getName(), e);
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java
index 8d9a686..85fea77 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java
@@ -85,7 +85,7 @@ public class RecoveryTest extends AbstractTestRestApi {
       // run python interpreter and create new variable `user`
       Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
       p1.setText("%python user='abc'");
-      PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() +"?blocking=true", "");
       assertThat(post, isAllowed());
       Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
@@ -102,7 +102,7 @@ public class RecoveryTest extends AbstractTestRestApi {
       note1 = TestUtils.getInstance(Notebook.class).getNote(note1.getId());
       p1 = note1.getParagraph(p1.getId());
       p1.setText("%python print(user)");
-      post = httpPost("/notebook/job/" + note1.getId(), "");
+      post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
       assertEquals(resp.get("status"), "OK");
       post.releaseConnection();
       assertEquals(Job.Status.FINISHED, p1.getStatus());
@@ -127,7 +127,7 @@ public class RecoveryTest extends AbstractTestRestApi {
       // run python interpreter and create new variable `user`
       Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
       p1.setText("%python user='abc'");
-      PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
       assertThat(post, isAllowed());
       Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
@@ -150,7 +150,7 @@ public class RecoveryTest extends AbstractTestRestApi {
       note1 = TestUtils.getInstance(Notebook.class).getNote(note1.getId());
       p1 = note1.getParagraph(p1.getId());
       p1.setText("%python print(user)");
-      post = httpPost("/notebook/job/" + note1.getId(), "");
+      post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
       assertEquals(resp.get("status"), "OK");
       post.releaseConnection();
       assertEquals(Job.Status.ERROR, p1.getStatus());
@@ -174,7 +174,7 @@ public class RecoveryTest extends AbstractTestRestApi {
       // run python interpreter and create new variable `user`
       Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
       p1.setText("%python user='abc'");
-      PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
       assertThat(post, isAllowed());
       Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
@@ -194,7 +194,7 @@ public class RecoveryTest extends AbstractTestRestApi {
       note1 = TestUtils.getInstance(Notebook.class).getNote(note1.getId());
       p1 = note1.getParagraph(p1.getId());
       p1.setText("%python print(user)");
-      post = httpPost("/notebook/job/" + note1.getId(), "");
+      post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
       assertEquals(resp.get("status"), "OK");
       post.releaseConnection();
       assertEquals(Job.Status.ERROR, p1.getStatus());
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
index b4fbb3d..c4e898c 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
@@ -30,6 +30,8 @@ import com.google.gson.reflect.TypeToken;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.PutMethod;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.socket.NotebookServer;
 import org.apache.zeppelin.utils.TestUtils;
@@ -111,7 +113,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
       Paragraph p = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
 
       // run blank paragraph
-      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId(), "");
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId() + "?blocking=true", "");
       assertThat(post, isAllowed());
       Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
@@ -121,7 +123,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
 
       // run non-blank paragraph
       p.setText("test");
-      post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId(), "");
+      post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId() + "?blocking=true", "");
       assertThat(post, isAllowed());
       resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
@@ -213,7 +215,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
       p1.setText("%python import time\ntime.sleep(1)\nuser='abc'");
       p2.setText("%python from __future__ import print_function\nprint(user)");
 
-      PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
       assertThat(post, isAllowed());
       Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
@@ -252,7 +254,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
       p1.setText("%python import time\ntime.sleep(5)\nname='hello'\nz.put('name', name)");
       p2.setText("%sh(interpolate=true) echo '{name}'");
 
-      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?waitToFinish=false", "");
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
       assertThat(post, isAllowed());
       Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
@@ -274,6 +276,104 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
   }
 
   @Test
+  public void testRunNoteBlocking_Isolated() throws IOException {
+    Note note1 = null;
+    try {
+      InterpreterSettingManager interpreterSettingManager =
+              TestUtils.getInstance(InterpreterSettingManager.class);
+      InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("python");
+      int pythonProcessNum = interpreterSetting.getAllInterpreterGroups().size();
+
+      note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
+      // 2 paragraphs
+      // P1:
+      //    %python
+      //    import time
+      //    time.sleep(1)
+      //    user='abc'
+      // P2:
+      //    %python
+      //    from __future__ import print_function
+      //    print(user)
+      //
+      Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      p1.setText("%python import time\ntime.sleep(1)\nuser='abc'");
+      p2.setText("%python from __future__ import print_function\nprint(user)");
+
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true&isolated=true", "");
+      assertThat(post, isAllowed());
+      Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
+              new TypeToken<Map<String, Object>>() {}.getType());
+      assertEquals(resp.get("status"), "OK");
+      post.releaseConnection();
+
+      assertEquals(Job.Status.FINISHED, p1.getStatus());
+      assertEquals(Job.Status.FINISHED, p2.getStatus());
+      assertEquals("abc\n", p2.getReturn().message().get(0).getData());
+
+      // no new python process is created because it is isolated mode.
+      assertEquals(pythonProcessNum, interpreterSetting.getAllInterpreterGroups().size());
+    } finally {
+      // cleanup
+      if (null != note1) {
+        TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
+      }
+    }
+  }
+
+  @Test
+  public void testRunNoteNonBlocking_Isolated() throws IOException, InterruptedException {
+    Note note1 = null;
+    try {
+      InterpreterSettingManager interpreterSettingManager =
+              TestUtils.getInstance(InterpreterSettingManager.class);
+      InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("python");
+      int pythonProcessNum = interpreterSetting.getAllInterpreterGroups().size();
+
+      note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
+      // 2 paragraphs
+      // P1:
+      //    %python
+      //    import time
+      //    time.sleep(1)
+      //    user='abc'
+      // P2:
+      //    %python
+      //    from __future__ import print_function
+      //    print(user)
+      //
+      Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      p1.setText("%python import time\ntime.sleep(1)\nuser='abc'");
+      p2.setText("%python from __future__ import print_function\nprint(user)");
+
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=false&isolated=true", "");
+      assertThat(post, isAllowed());
+      Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
+              new TypeToken<Map<String, Object>>() {}.getType());
+      assertEquals(resp.get("status"), "OK");
+      post.releaseConnection();
+
+      // wait for all the paragraphs are done
+      while(note1.isRunning()) {
+        Thread.sleep(1000);
+      }
+      assertEquals(Job.Status.FINISHED, p1.getStatus());
+      assertEquals(Job.Status.FINISHED, p2.getStatus());
+      assertEquals("abc\n", p2.getReturn().message().get(0).getData());
+
+      // no new python process is created because it is isolated mode.
+      assertEquals(pythonProcessNum, interpreterSetting.getAllInterpreterGroups().size());
+    } finally {
+      // cleanup
+      if (null != note1) {
+        TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
+      }
+    }
+  }
+
+  @Test
   public void testRunAllParagraph_FirstFailed() throws IOException {
     Note note1 = null;
     try {
@@ -294,7 +394,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
       p1.setText("%python import time\ntime.sleep(1)\nfrom __future__ import print_function\nprint(user2)");
       p2.setText("%python user2='abc'\nprint(user2)");
 
-      PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
       assertThat(post, isExpectationFailed());
       Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
@@ -467,7 +567,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
       p1.setText("%python import time\ntime.sleep(1)\nuser='abc'");
       p2.setText("%python from __future__ import print_function\nprint(user)");
 
-      PostMethod post1 = httpPost("/notebook/job/" + note1.getId(), "");
+      PostMethod post1 = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
       assertThat(post1, isAllowed());
       post1.releaseConnection();
       PutMethod put = httpPut("/notebook/" + note1.getId() + "/clear", "");
@@ -483,7 +583,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
       p1 = note1.getParagraph(p1.getId());
       p2 = note1.getParagraph(p2.getId());
 
-      PostMethod post2 = httpPost("/notebook/job/" + note1.getId(), "");
+      PostMethod post2 = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
       assertThat(post2, isAllowed());
       Map<String, Object> resp = gson.fromJson(post2.getResponseBodyAsString(),
           new TypeToken<Map<String, Object>>() {}.getType());
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
index f353429..630b631 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
@@ -447,7 +447,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
       TestUtils.getInstance(Notebook.class).saveNote(note, anonymous);
       String noteId = note.getId();
 
-      note.runAll(anonymous, true);
+      note.runAll(anonymous, true, false);
       // wait until job is finished or timeout.
       int timeout = 1;
       while (!paragraph.isTerminated()) {
@@ -459,7 +459,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
       }
 
       // Call Run note jobs REST API
-      PostMethod postNoteJobs = httpPost("/notebook/job/" + noteId, "");
+      PostMethod postNoteJobs = httpPost("/notebook/job/" + noteId + "?blocking=true", "");
       assertThat("test note jobs run:", postNoteJobs, isAllowed());
       postNoteJobs.releaseConnection();
 
@@ -509,7 +509,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
       TestUtils.getInstance(Notebook.class).saveNote(note, anonymous);
       String noteId = note.getId();
 
-      note.runAll(anonymous, true);
+      note.runAll(anonymous, true, false);
       // assume that status of the paragraph is running
       GetMethod get = httpGet("/notebook/job/" + noteId);
       assertThat("test get note job: ", get, isAllowed());
@@ -563,7 +563,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
       TestUtils.getInstance(Notebook.class).saveNote(note, anonymous);
       String noteId = note.getId();
 
-      note.runAll(anonymous, true);
+      note.runAll(anonymous, true, false);
 
       // Call Run paragraph REST API
       PostMethod postParagraph = httpPost("/notebook/job/" + noteId + "/" + paragraph.getId(),
@@ -601,7 +601,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
       config.put("enabled", true);
       paragraph.setConfig(config);
 
-      note.runAll(AuthenticationInfo.ANONYMOUS, false);
+      note.runAll(AuthenticationInfo.ANONYMOUS, false, false);
 
       String jsonRequest = "{\"cron\":\"* * * * * ?\" }";
       // right cron expression but not exist note.
@@ -651,7 +651,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
       config.put("enabled", true);
       paragraph.setConfig(config);
 
-      note.runAll(AuthenticationInfo.ANONYMOUS, false);
+      note.runAll(AuthenticationInfo.ANONYMOUS, false, false);
 
       String jsonRequest = "{\"cron\":\"* * * * * ?\" }";
       // right cron expression.
@@ -663,7 +663,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
       System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_CRON_FOLDERS.getVarName(), "/System");
 
       note.setName("System/test2");
-      note.runAll(AuthenticationInfo.ANONYMOUS, false);
+      note.runAll(AuthenticationInfo.ANONYMOUS, false, false);
       postCron = httpPost("/notebook/cron/" + note.getId(), jsonRequest);
       assertThat("", postCron, isAllowed());
       postCron.releaseConnection();
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index e5ab0b2..c4a507a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -87,9 +87,6 @@ public class InterpreterSetting {
   private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of(
       "language", (Object) "text",
       "editOnDblClick", false);
-  private static final DateTimeFormatter DATE_TIME_FORMATTER =
-          DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss");
-
 
   public static String  PARAGRAPH_CONFIG_RUNONSELECTIONCHANGE = "runOnSelectionChange";
   public static String  PARAGRAPH_CONFIG_TITLE = "title";
@@ -424,9 +421,9 @@ public class InterpreterSetting {
   }
 
   private String getInterpreterGroupId(ExecutionContext executionContext) {
-    if (executionContext.isInCronMode()) {
-      return "cron-" + name + "-" + executionContext.getNoteId() + "-" +
-              DATE_TIME_FORMATTER.format(LocalDateTime.now());
+    if (executionContext.isInIsolatedMode()) {
+      return name + "-isolated-" + executionContext.getNoteId() + "-" +
+              executionContext.getStartTime();
     }
 
     List<String> keys = new ArrayList<>();
@@ -465,7 +462,7 @@ public class InterpreterSetting {
   }
 
   public ManagedInterpreterGroup getOrCreateInterpreterGroup(String user, String noteId) {
-    return getOrCreateInterpreterGroup(new ExecutionContext(user, noteId));
+    return getOrCreateInterpreterGroup(new ExecutionContextBuilder().setUser(user).setNoteId(noteId).createExecutionContext());
   }
 
   public ManagedInterpreterGroup getOrCreateInterpreterGroup(ExecutionContext executionContext) {
@@ -494,7 +491,7 @@ public class InterpreterSetting {
   }
 
   public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
-    return getInterpreterGroup(new ExecutionContext(user, noteId));
+    return getInterpreterGroup(new ExecutionContextBuilder().setUser(user).setNoteId(noteId).createExecutionContext());
   }
 
   public ManagedInterpreterGroup getInterpreterGroup(ExecutionContext executionContext) {
@@ -533,7 +530,7 @@ public class InterpreterSetting {
   }
 
   public void closeInterpreters(String user, String noteId) {
-    closeInterpreters(new ExecutionContext(user, noteId));
+    closeInterpreters(new ExecutionContextBuilder().setUser(user).setNoteId(noteId).createExecutionContext());
   }
 
   public void closeInterpreters(ExecutionContext executionContext) {
@@ -874,7 +871,7 @@ public class InterpreterSetting {
   }
 
   List<Interpreter> getOrCreateSession(String user, String noteId) {
-    return getOrCreateSession(new ExecutionContext(user, noteId));
+    return getOrCreateSession(new ExecutionContextBuilder().setUser(user).setNoteId(noteId).createExecutionContext());
   }
 
   List<Interpreter> getOrCreateSession(ExecutionContext executionContext) {
@@ -885,7 +882,7 @@ public class InterpreterSetting {
   }
 
   public Interpreter getDefaultInterpreter(String user, String noteId) {
-    return getOrCreateSession(new ExecutionContext(user, noteId)).get(0);
+    return getOrCreateSession(new ExecutionContextBuilder().setUser(user).setNoteId(noteId).createExecutionContext()).get(0);
   }
 
   public Interpreter getDefaultInterpreter(ExecutionContext executionContext) {
@@ -893,7 +890,7 @@ public class InterpreterSetting {
   }
 
   public Interpreter getInterpreter(String user, String noteId, String replName) {
-    return getInterpreter(new ExecutionContext(user, noteId), replName);
+    return getInterpreter(new ExecutionContextBuilder().setUser(user).setNoteId(noteId).createExecutionContext(), replName);
   }
 
   public Interpreter getInterpreter(ExecutionContext executionContext, String replName) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 5338bfc..8b6a890 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -50,7 +50,6 @@ import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.notebook.ApplicationState;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.NoteEventListener;
@@ -689,7 +688,11 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
   }
 
   public void removeResourcesBelongsToNote(String noteId) {
-    removeResourcesBelongsToParagraph(noteId, null);
+    try {
+      removeResourcesBelongsToParagraph(noteId, null);
+    } catch (Exception e) {
+      LOGGER.warn("Fail to remove resources", e);
+    }
   }
 
   /**
@@ -875,7 +878,7 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
 
   // restart in note page
   public void restart(String settingId, String user, String noteId) throws InterpreterException {
-    restart(settingId, new ExecutionContext(user, noteId));
+    restart(settingId, new ExecutionContextBuilder().setUser(user).setNoteId(noteId).createExecutionContext());
   }
 
   // restart in note page
@@ -1029,7 +1032,7 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
     // remove from all interpreter instance's angular object registry
     for (InterpreterSetting settings : interpreterSettings.values()) {
       InterpreterGroup interpreterGroup = settings.getInterpreterGroup(
-              new ExecutionContext(subject.getUser(), note.getId()));
+              new ExecutionContextBuilder().setUser(subject.getUser()).setNoteId(note.getId()).createExecutionContext());
       if (interpreterGroup != null) {
         AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
         if (registry instanceof RemoteAngularObjectRegistry) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 9fb8ba8..03b9c75 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -30,6 +30,7 @@ import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
@@ -42,6 +43,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObject;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.notebook.utility.IdHashes;
+import org.apache.zeppelin.scheduler.ExecutorFactory;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.user.Credentials;
@@ -50,6 +52,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -67,8 +71,8 @@ import java.util.Set;
  * via this class.
  */
 public class Note implements JsonSerializable {
-  private static final Logger logger = LoggerFactory.getLogger(Note.class);
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(Note.class);
   // serialize Paragraph#runtimeInfos and Note#path to frontend but not to note file
   private static final ExclusionStrategy strategy = new ExclusionStrategy() {
     @Override
@@ -81,17 +85,17 @@ public class Note implements JsonSerializable {
       return false;
     }
   };
-
-  private static Gson gson = new GsonBuilder()
+  private static final Gson GSON = new GsonBuilder()
       .setPrettyPrinting()
       .setDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
       .registerTypeAdapter(Date.class, new NotebookImportDeserializer())
       .registerTypeAdapterFactory(Input.TypeAdapterFactory)
       .setExclusionStrategies(strategy)
       .create();
+  private static final DateTimeFormatter DATE_TIME_FORMATTER =
+          DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss");
 
   private List<Paragraph> paragraphs = new LinkedList<>();
-
   private String name = "";
   private String id;
   private String defaultInterpreterGroup;
@@ -100,6 +104,7 @@ public class Note implements JsonSerializable {
   private Map<String, Object> noteParams = new LinkedHashMap<>();
   private Map<String, Input> noteForms = new LinkedHashMap<>();
   private Map<String, List<AngularObject>> angularObjects = new HashMap<>();
+
   /*
    * note configurations.
    * - looknfeel - cron
@@ -441,7 +446,7 @@ public class Note implements JsonSerializable {
     Map<String, Object> param = srcParagraph.settings.getParams();
     Map<String, Input> form = srcParagraph.settings.getForms();
 
-    logger.debug("srcParagraph user: " + srcParagraph.getUser());
+    LOGGER.debug("srcParagraph user: " + srcParagraph.getUser());
 
     newParagraph.setAuthenticationInfo(subject);
     newParagraph.setConfig(config);
@@ -450,15 +455,15 @@ public class Note implements JsonSerializable {
     newParagraph.setText(srcParagraph.getText());
     newParagraph.setTitle(srcParagraph.getTitle());
 
-    logger.debug("newParagraph user: " + newParagraph.getUser());
+    LOGGER.debug("newParagraph user: " + newParagraph.getUser());
 
     try {
-      String resultJson = gson.toJson(srcParagraph.getReturn());
+      String resultJson = GSON.toJson(srcParagraph.getReturn());
       InterpreterResult result = InterpreterResult.fromJson(resultJson);
       newParagraph.setReturn(result, null);
     } catch (Exception e) {
       // 'result' part of Note consists of exception, instead of actual interpreter results
-      logger.warn(
+      LOGGER.warn(
           "Paragraph " + srcParagraph.getId() + " has a result with exception. " + e.getMessage());
     }
 
@@ -747,15 +752,48 @@ public class Note implements JsonSerializable {
     }
   }
 
-  public void runAll(AuthenticationInfo authenticationInfo,
-                     boolean blocking) throws Exception {
+  /**
+   * Run all the paragraphs of this note in different kinds of ways:
+   * - blocking/non-blocking
+   * - isolated/non-isolated
+   *
+   * @param authInfo
+   * @param blocking
+   * @param isolated
+   * @throws Exception
+   */
+  public void runAll(AuthenticationInfo authInfo,
+                     boolean blocking,
+                     boolean isolated) throws Exception {
+    if (blocking) {
+      runAllSync(authInfo, isolated);
+    } else {
+      ExecutorFactory.singleton().getNoteJobExecutor().submit(() -> {
+        try {
+          runAllSync(authInfo, isolated);
+        } catch (Exception e) {
+          LOGGER.warn("Fail to run note: " + id, e);
+        }
+      });
+    }
+  }
+
+  /**
+   * Run all the paragraphs in sync(blocking) way.
+   *
+   * @param authInfo
+   * @param isolated
+   */
+  private void runAllSync(AuthenticationInfo authInfo, boolean isolated) throws Exception {
+    setIsolatedMode(isolated);
     setRunning(true);
+    setStartTime(DATE_TIME_FORMATTER.format(LocalDateTime.now()));
     try {
       for (Paragraph p : getParagraphs()) {
         if (!p.isEnabled()) {
           continue;
         }
-        p.setAuthenticationInfo(authenticationInfo);
+        p.setAuthenticationInfo(authInfo);
         try {
           Interpreter interpreter = p.getBindedInterpreter();
           if (interpreter != null) {
@@ -767,17 +805,41 @@ public class Note implements JsonSerializable {
         } catch (InterpreterNotFoundException e) {
           // ignore, because the following run method will fail if interpreter not found.
         }
-        if (!run(p.getId(), blocking)) {
-          logger.warn("Skip running the remain notes because paragraph {} fails", p.getId());
-          throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, " +
+        // Must run each paragraph in blocking way.
+        if (!run(p.getId(), true)) {
+          LOGGER.warn("Skip running the remain notes because paragraph {} fails", p.getId());
+          throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, result: " +
                   p.getReturn());
         }
       }
+    } catch (Exception e) {
+      throw e;
     } finally {
+      if (isolated) {
+        LOGGER.info("Releasing interpreters used by this note: {}", id);
+        for (InterpreterSetting setting : getUsedInterpreterSettings()) {
+          ExecutionContext executionContext = new ExecutionContextBuilder()
+                  .setUser(authInfo.getUser())
+                  .setNoteId(id)
+                  .setDefaultInterpreterGroup(defaultInterpreterGroup)
+                  .setInIsolatedMode(isolated)
+                  .setStartTime(getStartTime())
+                  .createExecutionContext();
+          setting.closeInterpreters(executionContext);
+        }
+      }
       setRunning(false);
+      setIsolatedMode(false);
+      clearStartTime();
     }
   }
 
+  /**
+   * Run a single paragraph in non-blocking way.
+   *
+   * @param paragraphId
+   * @return
+   */
   public boolean run(String paragraphId) {
     return run(paragraphId, false);
   }
@@ -786,6 +848,7 @@ public class Note implements JsonSerializable {
    * Run a single paragraph.
    *
    * @param paragraphId ID of paragraph
+   * @param blocking Whether run this paragraph in blocking way
    */
   public boolean run(String paragraphId, boolean blocking) {
     return run(paragraphId, blocking, null);
@@ -855,7 +918,7 @@ public class Note implements JsonSerializable {
     }
 
     for (InterpreterSetting setting : settings) {
-      InterpreterGroup intpGroup = setting.getInterpreterGroup(new ExecutionContext(user, id));
+      InterpreterGroup intpGroup = setting.getInterpreterGroup(new ExecutionContextBuilder().setUser(user).setNoteId(id).createExecutionContext());
       if (intpGroup != null) {
         AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
         angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
@@ -872,10 +935,10 @@ public class Note implements JsonSerializable {
     }
 
     for (InterpreterSetting setting : settings) {
-      if (setting.getInterpreterGroup(new ExecutionContext(user, id)) == null) {
+      if (setting.getInterpreterGroup(new ExecutionContextBuilder().setUser(user).setNoteId(id).createExecutionContext()) == null) {
         continue;
       }
-      InterpreterGroup intpGroup = setting.getInterpreterGroup(new ExecutionContext(user, id));
+      InterpreterGroup intpGroup = setting.getInterpreterGroup(new ExecutionContextBuilder().setUser(user).setNoteId(id).createExecutionContext());
       AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
 
       if (registry instanceof RemoteAngularObjectRegistry) {
@@ -1016,13 +1079,25 @@ public class Note implements JsonSerializable {
     }
   }
 
-  public void setCronMode(boolean cronMode) {
-    info.put("inCronMode", cronMode);
+  public void setIsolatedMode(boolean isolatedMode) {
+    info.put("inIsolatedMode", isolatedMode);
   }
 
-  public boolean isCronMode() {
+  public boolean isIsolatedMode() {
     return Boolean.parseBoolean(
-            info.getOrDefault("inCronMode", "false").toString());
+            info.getOrDefault("inIsolatedMode", "false").toString());
+  }
+
+  public void setStartTime(String startTime) {
+    info.put("startTime", startTime);
+  }
+
+  public String getStartTime() {
+    return info.getOrDefault("startTime", "").toString();
+  }
+
+  public void clearStartTime() {
+    info.remove("startTime");
   }
 
   public boolean isRunning() {
@@ -1040,7 +1115,7 @@ public class Note implements JsonSerializable {
 
   @Override
   public String toJson() {
-    return gson.toJson(this);
+    return GSON.toJson(this);
   }
 
   /**
@@ -1052,13 +1127,13 @@ public class Note implements JsonSerializable {
    */
   public static Note fromJson(String json) throws IOException {
     try {
-      Note note = gson.fromJson(json, Note.class);
+      Note note = GSON.fromJson(json, Note.class);
       convertOldInput(note);
       note.info.remove("isRunning");
       note.postProcessParagraphs();
       return note;
     } catch (Exception e) {
-      logger.error("Fail to parse note json: " + e.toString());
+      LOGGER.error("Fail to parse note json: " + e.toString());
       throw new IOException("Fail to parse note json: " + json, e);
     }
   }
@@ -1132,8 +1207,8 @@ public class Note implements JsonSerializable {
   }
 
   @VisibleForTesting
-  public static Gson getGson() {
-    return gson;
+  public static Gson getGSON() {
+    return GSON;
   }
 
   public void setNoteEventListeners(List<NoteEventListener> noteEventListeners) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index be3a5bb..2727436 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -34,7 +34,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
@@ -495,7 +495,7 @@ public class Notebook {
       List<InterpreterSetting> settings = interpreterSettingManager.get();
       for (InterpreterSetting setting : settings) {
         InterpreterGroup intpGroup = setting.getInterpreterGroup(
-                new ExecutionContext(subject.getUser(), note.getId()));
+                new ExecutionContextBuilder().setUser(subject.getUser()).setNoteId(note.getId()).createExecutionContext());
         if (intpGroup != null && intpGroup.getId().equals(snapshot.getIntpGroupId())) {
           AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
           String noteId = snapshot.getAngularObject().getNoteId();
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 00cc926..30d3330 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -28,10 +28,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.zeppelin.common.JsonSerializable;
 import org.apache.zeppelin.display.AngularObject;
@@ -41,17 +39,15 @@ import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.helium.HeliumPackage;
 import org.apache.zeppelin.interpreter.Constants;
 import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.Interpreter.FormType;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
@@ -246,9 +242,15 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   }
 
   public Interpreter getBindedInterpreter() throws InterpreterNotFoundException {
-    return this.note.getInterpreterFactory().getInterpreter(intpText,
-        new ExecutionContext(user, note.getId(),
-                note.getDefaultInterpreterGroup(), note.isCronMode()));
+    ExecutionContext executionContext = new ExecutionContextBuilder()
+            .setUser(user)
+            .setNoteId(note.getId())
+            .setDefaultInterpreterGroup(note.getDefaultInterpreterGroup())
+            .setInIsolatedMode(note.isIsolatedMode())
+            .setStartTime(note.getStartTime())
+            .createExecutionContext();
+
+    return this.note.getInterpreterFactory().getInterpreter(intpText, executionContext);
   }
 
   public void setInterpreter(Interpreter interpreter) {
@@ -632,8 +634,14 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
 
   public boolean isValidInterpreter(String replName) {
     try {
-      return note.getInterpreterFactory().getInterpreter(replName,
-              new ExecutionContext(user, note.getId(), note.getDefaultInterpreterGroup())) != null;
+      ExecutionContext executionContext = new ExecutionContextBuilder()
+              .setUser(user)
+              .setNoteId(note.getId())
+              .setDefaultInterpreterGroup(note.getDefaultInterpreterGroup())
+              .setInIsolatedMode(note.isIsolatedMode())
+              .setStartTime(note.getStartTime())
+              .createExecutionContext();
+      return note.getInterpreterFactory().getInterpreter(replName, executionContext) != null;
     } catch (InterpreterNotFoundException e) {
       return false;
     }
@@ -750,11 +758,11 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
 
   @Override
   public String toJson() {
-    return Note.getGson().toJson(this);
+    return Note.getGSON().toJson(this);
   }
 
   public static Paragraph fromJson(String json) {
-    return Note.getGson().fromJson(json, Paragraph.class);
+    return Note.getGSON().fromJson(json, Paragraph.class);
   }
 
   public void updateOutputBuffer(int index, InterpreterResult.Type type, String output) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
index af47a57..95d08d3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.notebook.scheduler;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -44,32 +45,20 @@ public class CronJob implements org.quartz.Job {
       return;
     }
 
+    String cronExecutingUser = (String) note.getConfig().get("cronExecutingUser");
+    String cronExecutingRoles = (String) note.getConfig().get("cronExecutingRoles");
+    if (null == cronExecutingUser) {
+      cronExecutingUser = "anonymous";
+    }
+    AuthenticationInfo authenticationInfo =
+            new AuthenticationInfo(
+                    cronExecutingUser,
+                    StringUtils.isEmpty(cronExecutingRoles) ? null : cronExecutingRoles,
+                    null);
     try {
-      note.setCronMode(true);
-
-      String cronExecutingUser = (String) note.getConfig().get("cronExecutingUser");
-      String cronExecutingRoles = (String) note.getConfig().get("cronExecutingRoles");
-      if (null == cronExecutingUser) {
-        cronExecutingUser = "anonymous";
-      }
-      AuthenticationInfo authenticationInfo =
-              new AuthenticationInfo(
-                      cronExecutingUser,
-                      StringUtils.isEmpty(cronExecutingRoles) ? null : cronExecutingRoles,
-                      null);
-      try {
-        note.runAll(authenticationInfo, true);
-      } catch (Exception e) {
-        LOGGER.warn("Fail to run note: " + note.getName(), e);
-      }
-
-      LOGGER.info("Releasing interpreters used by this note: " + note.getId());
-      for (InterpreterSetting setting : note.getUsedInterpreterSettings()) {
-          setting.closeInterpreters(new ExecutionContext(cronExecutingUser, note.getId(),
-                  note.getDefaultInterpreterGroup(), true));
-      }
-    } finally {
-      note.setCronMode(false);
+      note.runAll(authenticationInfo, true, true);
+    } catch (Exception e) {
+      LOGGER.warn("Fail to run note: " + note.getName(), e);
     }
   }
 }
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java
index ab32d10..cff05d4 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java
@@ -20,14 +20,16 @@ package org.apache.zeppelin.interpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.junit.Test;
 
-import java.io.IOException;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class ConfInterpreterTest extends AbstractInterpreterTest {
 
-  private ExecutionContext executionContext = new ExecutionContext("user1", "note1", "test");
+  private ExecutionContext executionContext = new ExecutionContextBuilder()
+          .setUser("user1")
+          .setNoteId("note1")
+          .setDefaultInterpreterGroup("test")
+          .createExecutionContext();
 
   @Test
   public void testCorrectConf() throws InterpreterException {
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index 87d65cd..76188b6 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -29,32 +29,32 @@ public class InterpreterFactoryTest extends AbstractInterpreterTest {
   @Test
   public void testGetFactory() throws InterpreterException {
 
-    assertTrue(interpreterFactory.getInterpreter("", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter);
-    RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("", new ExecutionContext("user1", "note1", "test"));
+    assertTrue(interpreterFactory.getInterpreter("", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext()) instanceof RemoteInterpreter);
+    RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     // EchoInterpreter is the default interpreter because test is the default interpreter group
     assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
 
-    assertTrue(interpreterFactory.getInterpreter("double_echo", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter);
-    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("double_echo", new ExecutionContext("user1", "note1", "test"));
+    assertTrue(interpreterFactory.getInterpreter("double_echo", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext()) instanceof RemoteInterpreter);
+    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("double_echo", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     assertEquals(DoubleEchoInterpreter.class.getName(), remoteInterpreter.getClassName());
 
-    assertTrue(interpreterFactory.getInterpreter("test", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter);
-    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test", new ExecutionContext("user1", "note1", "test"));
+    assertTrue(interpreterFactory.getInterpreter("test", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext()) instanceof RemoteInterpreter);
+    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
 
-    assertTrue(interpreterFactory.getInterpreter("test2", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter);
-    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test2", new ExecutionContext("user1", "note1", "test"));
+    assertTrue(interpreterFactory.getInterpreter("test2", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext()) instanceof RemoteInterpreter);
+    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test2", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
 
-    assertTrue(interpreterFactory.getInterpreter("test2.double_echo", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter);
-    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test2.double_echo", new ExecutionContext("user1", "note1", "test"));
+    assertTrue(interpreterFactory.getInterpreter("test2.double_echo", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext()) instanceof RemoteInterpreter);
+    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test2.double_echo", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     assertEquals(DoubleEchoInterpreter.class.getName(), remoteInterpreter.getClassName());
   }
 
   @Test
   public void testUnknownRepl1() {
     try {
-      interpreterFactory.getInterpreter("test.unknown_repl", new ExecutionContext("user1", "note1", "test"));
+      interpreterFactory.getInterpreter("test.unknown_repl", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
       fail("should fail due to no such interpreter");
     } catch (InterpreterNotFoundException e) {
       assertEquals("No such interpreter: test.unknown_repl", e.getMessage());
@@ -64,7 +64,7 @@ public class InterpreterFactoryTest extends AbstractInterpreterTest {
   @Test
   public void testUnknownRepl2() {
     try {
-      interpreterFactory.getInterpreter("unknown_repl", new ExecutionContext("user1", "note1", "test"));
+      interpreterFactory.getInterpreter("unknown_repl", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
       fail("should fail due to no such interpreter");
     } catch (InterpreterNotFoundException e) {
       assertEquals("No such interpreter: unknown_repl", e.getMessage());
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java
index 61eefae..e18ac0b 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java
@@ -192,13 +192,13 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
 
   //@Test
   public void testGetEditor() throws IOException, InterpreterNotFoundException {
-    Interpreter echoInterpreter = interpreterFactory.getInterpreter("test.echo", new ExecutionContext("user1", "note1", "test"));
+    Interpreter echoInterpreter = interpreterFactory.getInterpreter("test.echo", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     // get editor setting from interpreter-setting.json
     Map<String, Object> editor = interpreterSettingManager.getEditorSetting("test.echo", "note1");
     assertEquals("java", editor.get("language"));
 
     // when editor setting doesn't exit, return the default editor
-    Interpreter mock1Interpreter = interpreterFactory.getInterpreter("mock1", new ExecutionContext("user1", "note1", "test"));
+    Interpreter mock1Interpreter = interpreterFactory.getInterpreter("mock1", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     editor = interpreterSettingManager.getEditorSetting("mock1", "note1");
     assertEquals("text", editor.get("language"));
   }
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java
index 3630d72..a0cdc29 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java
@@ -19,7 +19,7 @@ package org.apache.zeppelin.interpreter.lifecycle;
 
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
-import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
@@ -47,8 +47,8 @@ public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
 
   @Test
   public void testTimeout_1() throws InterpreterException, InterruptedException, IOException {
-    assertTrue(interpreterFactory.getInterpreter("test.echo", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter);
-    RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test.echo", new ExecutionContext("user1", "note1", "test"));
+    assertTrue(interpreterFactory.getInterpreter("test.echo", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext()) instanceof RemoteInterpreter);
+    RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test.echo", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
     assertFalse(remoteInterpreter.isOpened());
     InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test");
     assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
@@ -71,8 +71,8 @@ public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
 
   @Test
   public void testTimeout_2() throws InterpreterException, InterruptedException, IOException {
-    assertTrue(interpreterFactory.getInterpreter("test.sleep", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter);
-    final RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test.sleep", new ExecutionContext("user1", "note1", "test"));
+    assertTrue(interpreterFactory.getInterpreter("test.sleep", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext()) instanceof RemoteInterpreter);
+    final RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test.sleep", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
 
     // simulate how zeppelin submit paragraph
     remoteInterpreter.getScheduler().submit(new Job("test-job", null) {
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 029b3ab..aeb9a8e 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -22,7 +22,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
-import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.ExecutionContextBuilder;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
@@ -35,7 +35,6 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSettingsInfo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
 import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
 import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService;
-import org.apache.zeppelin.notebook.scheduler.SchedulerService;
 import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
@@ -476,7 +475,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
     p3.setText("%mock1 p3");
 
     // when
-    note.runAll(anonymous, true);
+    note.runAll(anonymous, true, false);
 
     assertEquals("repl1: p1", p1.getReturn().message().get(0).getData());
     assertNull(p2.getReturn());
@@ -695,9 +694,9 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
     schedulerService.refreshCron(note.getId());
 
 
-    RemoteInterpreter mock1 = (RemoteInterpreter) interpreterFactory.getInterpreter("mock1", new ExecutionContext(anonymous.getUser(), note.getId(), "test"));
+    RemoteInterpreter mock1 = (RemoteInterpreter) interpreterFactory.getInterpreter("mock1", new ExecutionContextBuilder().setUser(anonymous.getUser()).setNoteId(note.getId()).setDefaultInterpreterGroup("test").createExecutionContext());
 
-    RemoteInterpreter mock2 = (RemoteInterpreter) interpreterFactory.getInterpreter("mock2", new ExecutionContext(anonymous.getUser(), note.getId(), "test"));
+    RemoteInterpreter mock2 = (RemoteInterpreter) interpreterFactory.getInterpreter("mock2", new ExecutionContextBuilder().setUser(anonymous.getUser()).setNoteId(note.getId()).setDefaultInterpreterGroup("test").createExecutionContext());
 
     // wait until interpreters are started
     while (!mock1.isOpened() || !mock2.isOpened()) {
@@ -734,7 +733,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
       }
     });
     RemoteInterpreter cronNoteInterpreter =
-        (RemoteInterpreter) interpreterFactory.getInterpreter("mock1", new ExecutionContext(anonymous.getUser(), cronNote.getId(), "test"));
+        (RemoteInterpreter) interpreterFactory.getInterpreter("mock1", new ExecutionContextBuilder().setUser(anonymous.getUser()).setNoteId(cronNote.getId()).setDefaultInterpreterGroup("test").createExecutionContext());
 
     // create a paragraph of the cron scheduled note.
     Paragraph cronNoteParagraph = cronNote.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -749,7 +748,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
     Note anotherNote = notebook.createNote("note1", anonymous);
 
     RemoteInterpreter anotherNoteInterpreter =
-        (RemoteInterpreter) interpreterFactory.getInterpreter("mock2", new ExecutionContext(anonymous.getUser(), anotherNote.getId(), "test"));
+        (RemoteInterpreter) interpreterFactory.getInterpreter("mock2", new ExecutionContextBuilder().setUser(anonymous.getUser()).setNoteId(anotherNote.getId()).setDefaultInterpreterGroup("test").createExecutionContext());
 
     // create a paragraph of another note
     Paragraph anotherNoteParagraph = anotherNote.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -830,7 +829,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
     String simpleText = "hello world";
     p.setText(simpleText);
 
-    note.runAll(anonymous, true);
+    note.runAll(anonymous, true, false);
 
     String exportedNoteJson = notebook.exportNote(note.getId());
 
@@ -862,7 +861,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
 
     final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     p.setText("hello world");
-    note.runAll(anonymous, true);
+    note.runAll(anonymous, true, false);
 
     p.setStatus(Status.RUNNING);
     Note cloneNote = notebook.cloneNote(note.getId(), "clone note", anonymous);
@@ -898,7 +897,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
     for (InterpreterGroup intpGroup : interpreterSettingManager.getAllInterpreterGroup()) {
       intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
     }
-    note.runAll(anonymous, true);
+    note.runAll(anonymous, true, false);
 
     assertEquals(2, interpreterSettingManager.getAllResources().size());
 
@@ -1167,14 +1166,14 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
     p3.setText("%mock1 sleep 1000");
 
 
-    note.runAll(AuthenticationInfo.ANONYMOUS, false);
+    note.runAll(AuthenticationInfo.ANONYMOUS, false, false);
 
     // wait until first paragraph finishes and second paragraph starts
     while (p1.getStatus() != Status.FINISHED || p2.getStatus() != Status.RUNNING) Thread.yield();
 
     assertEquals(Status.FINISHED, p1.getStatus());
     assertEquals(Status.RUNNING, p2.getStatus());
-    assertEquals(Status.PENDING, p3.getStatus());
+    assertEquals(Status.READY, p3.getStatus());
 
     // restart interpreter
     interpreterSettingManager.restart(interpreterSettingManager.getInterpreterSettingByName("mock1").getId());
@@ -1182,7 +1181,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
     // make sure three different status aborted well.
     assertEquals(Status.FINISHED, p1.getStatus());
     assertEquals(Status.ABORT, p2.getStatus());
-    assertEquals(Status.ABORT, p3.getStatus());
+    assertEquals(Status.READY, p3.getStatus());
 
     notebook.removeNote(note.getId(), anonymous);
   }