You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/10 02:19:12 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4845]. Recover running paragraph when recovery is enabled

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new eba5193  [ZEPPELIN-4845]. Recover running paragraph when recovery is enabled
eba5193 is described below

commit eba51930ac4ca5c194b7fa8262709656d484b9f9
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jun 1 10:03:26 2020 +0800

    [ZEPPELIN-4845]. Recover running paragraph when recovery is enabled
    
    ### What is this PR for?
    
    This PR is to recover the running paragraph when recovery is enabled.
    1. Get all the running Interpreter processes from RecoveryStorage.
    2. Reconnect to the running interpreter process.
    3. Resubmit the paragraph to interpreter process.
    
    ### What type of PR is it?
    [ Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4845
    
    ### How should this be tested?
    * CI pass and manully tested the recovery scenairo.
    
    ### Screenshots (if appropriate)
    
    ![ezgif com-video-to-gif (1)](https://user-images.githubusercontent.com/164491/83387264-013df300-a41f-11ea-9170-091af76037e0.gif)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3781 from zjffdu/ZEPPELIN-4845 and squashes the following commits:
    
    2302471de [Jeff Zhang] [ZEPPELIN-4845]. Recover running paragraph when recovery is enabled
    
    (cherry picked from commit 446608f1064af07f604849b6caf9e01162efd0b6)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../zeppelin/flink/sql/SingleRowStreamSqlJob.java  |   3 +
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |   1 +
 .../zeppelin/interpreter/InterpreterContext.java   |  16 +
 .../zeppelin/interpreter/InterpreterOutput.java    |   2 -
 .../interpreter/launcher/InterpreterClient.java    |  41 +-
 .../launcher/InterpreterLaunchContext.java         |  20 +-
 .../interpreter/launcher/InterpreterLauncher.java  |  50 +-
 .../interpreter/recovery/RecoveryStorage.java      |  31 +-
 .../remote/RemoteInterpreterEventClient.java       |  12 +-
 .../remote/RemoteInterpreterServer.java            | 289 ++++---
 .../interpreter/thrift/AngularObjectId.java        |   2 +-
 .../interpreter/thrift/AppOutputAppendEvent.java   |   2 +-
 .../interpreter/thrift/AppOutputUpdateEvent.java   |   2 +-
 .../interpreter/thrift/AppStatusUpdateEvent.java   |   2 +-
 .../interpreter/thrift/InterpreterCompletion.java  |   2 +-
 .../interpreter/thrift/OutputAppendEvent.java      |   2 +-
 .../interpreter/thrift/OutputUpdateAllEvent.java   |   2 +-
 .../interpreter/thrift/OutputUpdateEvent.java      |   2 +-
 .../zeppelin/interpreter/thrift/ParagraphInfo.java |   2 +-
 .../zeppelin/interpreter/thrift/RegisterInfo.java  |   2 +-
 .../thrift/RemoteApplicationResult.java            |   2 +-
 .../thrift/RemoteInterpreterContext.java           |   2 +-
 .../interpreter/thrift/RemoteInterpreterEvent.java |   2 +-
 .../thrift/RemoteInterpreterEventService.java      |   2 +-
 .../thrift/RemoteInterpreterEventType.java         |   2 +-
 .../thrift/RemoteInterpreterResult.java            |   2 +-
 .../thrift/RemoteInterpreterResultMessage.java     |   2 +-
 .../thrift/RemoteInterpreterService.java           | 870 ++++++++++++++++++++-
 .../interpreter/thrift/RunParagraphsEvent.java     |   2 +-
 .../interpreter/thrift/ServiceException.java       |   2 +-
 .../main/thrift/RemoteInterpreterService.thrift    |   1 +
 .../launcher/ClusterInterpreterLauncher.java       |  16 +-
 .../launcher/ClusterInterpreterProcess.java        |   8 +-
 .../launcher/DockerInterpreterLauncher.java        |   6 +-
 .../launcher/DockerInterpreterProcess.java         |  16 +-
 .../launcher/DockerInterpreterProcessTest.java     |   4 +-
 .../launcher/K8sRemoteInterpreterProcess.java      |  14 +-
 .../launcher/K8sStandardInterpreterLauncher.java   |  10 +-
 .../launcher/K8sRemoteInterpreterProcessTest.java  |  20 +-
 .../org/apache/zeppelin/socket/NotebookServer.java |   8 +-
 .../org/apache/zeppelin/recovery/RecoveryTest.java | 153 +++-
 .../apache/zeppelin/rest/AbstractTestRestApi.java  |   8 +-
 .../interpreter/InterpreterSettingManager.java     |  14 +-
 .../interpreter/ManagedInterpreterGroup.java       |  25 +-
 .../interpreter/RemoteInterpreterEventServer.java  |   3 +-
 .../launcher/SparkInterpreterLauncher.java         |   1 -
 .../launcher/StandardInterpreterLauncher.java      |  28 +-
 .../recovery/FileSystemRecoveryStorage.java        |  39 +-
 .../interpreter/recovery/NullRecoveryStorage.java  |   3 +-
 .../interpreter/recovery/StopInterpreter.java      |   7 +-
 .../remote/RemoteInterpreterManagedProcess.java    |  14 +-
 .../remote/RemoteInterpreterProcess.java           |  32 +-
 .../remote/RemoteInterpreterRunningProcess.java    |  12 +-
 .../java/org/apache/zeppelin/notebook/Note.java    |   8 +-
 .../org/apache/zeppelin/notebook/Notebook.java     |  29 +
 .../org/apache/zeppelin/notebook/Paragraph.java    |  57 +-
 .../remote/RemoteInterpreterOutputTestStream.java  |   1 -
 .../zeppelin/scheduler/RemoteSchedulerTest.java    |   1 -
 58 files changed, 1604 insertions(+), 307 deletions(-)

diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
index 3c3125f..567b940 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
@@ -83,6 +83,9 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob {
       String output = buildResult();
       context.out.write(output);
       context.out.flush();
+      // should checkpoint the html output, otherwise frontend won't display the output
+      // after recovering.
+      context.getIntpEventClient().checkpointOutput(context.getNoteId(), context.getParagraphId());
       isFirstRefresh = false;
     }
 
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 625a8ed..7922c99 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -653,6 +653,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
   }
 
   def close(): Unit = {
+    LOGGER.info("Closing FlinkScalaInterpreter")
     if (properties.getProperty("flink.interpreter.close.shutdown_cluster", "true").toBoolean) {
       if (cluster != null) {
         cluster match {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 2b56971..7a215ef 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -25,12 +25,14 @@ import org.apache.zeppelin.user.AuthenticationInfo;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Interpreter context
  */
 public class InterpreterContext {
   private static final ThreadLocal<InterpreterContext> threadIC = new ThreadLocal<>();
+  private static final ConcurrentHashMap<Thread, InterpreterContext> allContexts = new ConcurrentHashMap();
 
   public InterpreterOutput out;
 
@@ -40,10 +42,16 @@ public class InterpreterContext {
 
   public static void set(InterpreterContext ic) {
     threadIC.set(ic);
+    allContexts.put(Thread.currentThread(), ic);
   }
 
   public static void remove() {
     threadIC.remove();
+    allContexts.remove(Thread.currentThread());
+  }
+
+  public static ConcurrentHashMap<Thread, InterpreterContext> getAllContexts() {
+    return allContexts;
   }
 
   private String noteId;
@@ -241,10 +249,18 @@ public class InterpreterContext {
     return angularObjectRegistry;
   }
 
+  public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
+    this.angularObjectRegistry = angularObjectRegistry;
+  }
+
   public ResourcePool getResourcePool() {
     return resourcePool;
   }
 
+  public void setResourcePool(ResourcePool resourcePool) {
+    this.resourcePool = resourcePool;
+  }
+
   public String getInterpreterClassName() {
     return interpreterClassName;
   }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
index ef1aafb..aaf6eda 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -59,7 +59,6 @@ public class InterpreterOutput extends OutputStream {
   public InterpreterOutput(InterpreterOutputListener flushListener) {
     this.flushListener = flushListener;
     changeListener = null;
-    clear();
   }
 
   public InterpreterOutput(InterpreterOutputListener flushListener,
@@ -67,7 +66,6 @@ public class InterpreterOutput extends OutputStream {
       throws IOException {
     this.flushListener = flushListener;
     this.changeListener = listener;
-    clear();
   }
 
   public void setType(InterpreterResult.Type type) throws IOException {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
index 73c8ef0..65243c7 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
@@ -21,22 +21,59 @@ import java.io.IOException;
 
 /**
  * Interface to InterpreterClient which is created by InterpreterLauncher. This is the component
- * that is used to for the communication from zeppelin-server process to zeppelin interpreter
- * process.
+ * that is used for the communication from zeppelin-server process to zeppelin interpreter
+ * process and also manage the lifecycle of interpreter process.
  */
 public interface InterpreterClient {
 
+  /**
+   * InterpreterGroupId that is associated with this interpreter process.
+   *
+   * @return
+   */
   String getInterpreterGroupId();
 
+  /**
+   * InterpreterSetting name of this interpreter process.
+   *
+   * @return
+   */
   String getInterpreterSettingName();
 
+  /**
+   * Start interpreter process.
+   *
+   * @param userName
+   * @throws IOException
+   */
   void start(String userName) throws IOException;
 
+  /**
+   * Stop interpreter process.
+   *
+   */
   void stop();
 
+  /**
+   * Host name of interpreter process thrift server
+   *
+   * @return
+   */
   String getHost();
 
+  /**
+   * Port of interpreter process thrift server
+   *
+   * @return
+   */
   int getPort();
 
   boolean isRunning();
+
+  /**
+   * Return true if recovering successfully, otherwise return false.
+   *
+   * @return
+   */
+  boolean recover();
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
index 136d866..cc90a63 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
@@ -35,8 +35,8 @@ public class InterpreterLaunchContext {
   private String interpreterSettingId;
   private String interpreterSettingGroup;
   private String interpreterSettingName;
-  private int zeppelinServerRPCPort;
-  private String zeppelinServerHost;
+  private int intpEventServerPort;
+  private String intpEventServerHost;
 
   public InterpreterLaunchContext(Properties properties,
                                   InterpreterOption option,
@@ -46,8 +46,8 @@ public class InterpreterLaunchContext {
                                   String interpreterSettingId,
                                   String interpreterSettingGroup,
                                   String interpreterSettingName,
-                                  int zeppelinServerRPCPort,
-                                  String zeppelinServerHost) {
+                                  int intpEventServerPort,
+                                  String intpEventServerHost) {
     this.properties = properties;
     this.option = option;
     this.runner = runner;
@@ -56,8 +56,8 @@ public class InterpreterLaunchContext {
     this.interpreterSettingId = interpreterSettingId;
     this.interpreterSettingGroup = interpreterSettingGroup;
     this.interpreterSettingName = interpreterSettingName;
-    this.zeppelinServerRPCPort = zeppelinServerRPCPort;
-    this.zeppelinServerHost = zeppelinServerHost;
+    this.intpEventServerPort = intpEventServerPort;
+    this.intpEventServerHost = intpEventServerHost;
   }
 
   public Properties getProperties() {
@@ -92,11 +92,11 @@ public class InterpreterLaunchContext {
     return userName;
   }
 
-  public int getZeppelinServerRPCPort() {
-    return zeppelinServerRPCPort;
+  public int getIntpEventServerPort() {
+    return intpEventServerPort;
   }
 
-  public String getZeppelinServerHost() {
-    return zeppelinServerHost;
+  public String getIntpEventServerHost() {
+    return intpEventServerHost;
   }
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
index 8e8bf53..1fb2ea9 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
@@ -18,7 +18,11 @@
 package org.apache.zeppelin.interpreter.launcher;
 
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterRunner;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -28,6 +32,7 @@ import java.util.Properties;
  */
 public abstract class InterpreterLauncher {
 
+  private static Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class);
   private static String SPECIAL_CHARACTER="{}()<>&*‘|=?;[]$–#~!.\"%/\\:+,`";
 
   protected ZeppelinConfiguration zConf;
@@ -43,6 +48,11 @@ public abstract class InterpreterLauncher {
     this.properties = props;
   }
 
+  /**
+   * The timeout setting in interpreter setting take precedence over
+   * that in zeppelin-site.xml
+   * @return
+   */
   protected int getConnectTimeout() {
     int connectTimeout =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
@@ -65,5 +75,43 @@ public abstract class InterpreterLauncher {
     return builder.toString();
   }
 
-  public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException;
+  /**
+   * Try to recover interpreter process first, then call launchDirectly via sub class implementation.
+   *
+   * @param context
+   * @return
+   * @throws IOException
+   */
+  public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
+    // try to recover it first
+    if (zConf.isRecoveryEnabled()) {
+      InterpreterClient recoveredClient =
+              recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
+      if (recoveredClient != null) {
+        if (recoveredClient.isRunning()) {
+          LOGGER.info("Recover interpreter process running at {} of interpreter group: {}",
+                  recoveredClient.getHost() + ":" + recoveredClient.getPort(),
+                  recoveredClient.getInterpreterGroupId());
+          return recoveredClient;
+        } else {
+          recoveryStorage.removeInterpreterClient(context.getInterpreterGroupId());
+          LOGGER.warn("Unable to recover interpreter process: " + recoveredClient.getHost() + ":"
+                  + recoveredClient.getPort() + ", as it is already terminated.");
+        }
+      }
+    }
+
+    // launch it via sub class implementation without recovering.
+    return launchDirectly(context);
+  }
+
+  /**
+   * launch interpreter process directly without recovering.
+   *
+   * @param context
+   * @return
+   * @throws IOException
+   */
+  public abstract InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException;
+
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java
index 8bbe830..b7c87c0 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java
@@ -21,11 +21,13 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 
 
 /**
  * Interface for storing interpreter process recovery metadata.
+ * Just store mapping between interpreterGroupId to interpreter process host:ip
  *
  */
 public abstract class RecoveryStorage {
@@ -33,12 +35,15 @@ public abstract class RecoveryStorage {
   protected ZeppelinConfiguration zConf;
   protected Map<String, InterpreterClient> restoredClients;
 
-  public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
+  // TODO(zjffdu) The constructor is inconsistent between base class and its implementation.
+  //  The implementation actually use InterpreterSettingManager, the interface should also use it.
+  public RecoveryStorage(ZeppelinConfiguration zConf) {
     this.zConf = zConf;
   }
 
   /**
    * Update RecoveryStorage when new InterpreterClient is started
+   *
    * @param client
    * @throws IOException
    */
@@ -46,6 +51,7 @@ public abstract class RecoveryStorage {
 
   /**
    * Update RecoveryStorage when InterpreterClient is stopped
+   *
    * @param client
    * @throws IOException
    */
@@ -55,7 +61,7 @@ public abstract class RecoveryStorage {
    *
    * It is only called when Zeppelin Server is started.
    *
-   * @return
+   * @return Map between interpreterGroupId to InterpreterClient
    * @throws IOException
    */
   public abstract Map<String, InterpreterClient> restore() throws IOException;
@@ -67,9 +73,24 @@ public abstract class RecoveryStorage {
    * @throws IOException
    */
   public void init() throws IOException {
-    this.restoredClients = restore();
+    Map<String, InterpreterClient> restoredClientsInStorage= restore();
+    this.restoredClients = new HashMap<String, InterpreterClient>();
+    for (Map.Entry<String, InterpreterClient> entry : restoredClientsInStorage.entrySet()) {
+      if (entry.getValue().recover()) {
+        this.restoredClients.put(entry.getKey(), entry.getValue());
+      } else {
+        onInterpreterClientStop(entry.getValue());
+      }
+    }
   }
 
+  /**
+   * Get InterpreterClient that is associated with this interpreterGroupId, return null when there's
+   * no such InterpreterClient.
+   *
+   * @param interpreterGroupId
+   * @return InterpreterClient
+   */
   public InterpreterClient getInterpreterClient(String interpreterGroupId) {
     if (restoredClients.containsKey(interpreterGroupId)) {
       return restoredClients.get(interpreterGroupId);
@@ -77,4 +98,8 @@ public abstract class RecoveryStorage {
       return null;
     }
   }
+
+  public void removeInterpreterClient(String interpreterGroupId) {
+    this.restoredClients.remove(interpreterGroupId);
+  }
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index 3de4e10..eadbf24 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -33,6 +33,7 @@ import org.apache.zeppelin.interpreter.thrift.OutputAppendEvent;
 import org.apache.zeppelin.interpreter.thrift.OutputUpdateAllEvent;
 import org.apache.zeppelin.interpreter.thrift.OutputUpdateEvent;
 import org.apache.zeppelin.interpreter.thrift.ParagraphInfo;
+import org.apache.zeppelin.interpreter.thrift.RegisterInfo;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService;
 import org.apache.zeppelin.interpreter.thrift.RunParagraphsEvent;
 import org.apache.zeppelin.interpreter.thrift.ServiceException;
@@ -62,9 +63,9 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
   private PooledRemoteClient<RemoteInterpreterEventService.Client> remoteClient;
   private String intpGroupId;
 
-  public RemoteInterpreterEventClient(String host, int port) {
+  public RemoteInterpreterEventClient(String intpEventHost, int intpEventPort) {
     this.remoteClient = new PooledRemoteClient<>(() -> {
-      TSocket transport = new TSocket(host, port);
+      TSocket transport = new TSocket(intpEventHost, intpEventPort);
       try {
         transport.open();
       } catch (TTransportException e) {
@@ -83,6 +84,13 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
     this.intpGroupId = intpGroupId;
   }
 
+  public void registerInterpreterProcess(RegisterInfo registerInfo) {
+    callRemoteFunction(client -> {
+      client.registerInterpreterProcess(registerInfo);
+      return null;
+    });
+  }
+
   /**
    * Get all resources except for specific resourcePool
    *
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 9c48f01..d711d3a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -90,7 +90,6 @@ import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -99,6 +98,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
 
@@ -109,7 +111,7 @@ import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META
 public class RemoteInterpreterServer extends Thread
     implements RemoteInterpreterService.Iface {
 
-  private static Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class);
+  private static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterServer.class);
 
   private String interpreterGroupId;
   private InterpreterGroup interpreterGroup;
@@ -120,11 +122,10 @@ public class RemoteInterpreterServer extends Thread
   private Gson gson = new Gson();
 
   private String intpEventServerHost;
+  private int intpEventServerPort;
   private String host;
   private int port;
   private TThreadPoolServer server;
-  RemoteInterpreterEventService.Client intpEventServiceClient;
-
   RemoteInterpreterEventClient intpEventClient;
   private DependencyResolver depLoader;
 
@@ -138,11 +139,18 @@ public class RemoteInterpreterServer extends Thread
   // Hold information for manual progress update
   private ConcurrentMap<String, Integer> progressMap = new ConcurrentHashMap<>();
 
+  // keep track of the running jobs for job recovery.
+  private ConcurrentMap<String, InterpretJob> runningJobs = new ConcurrentHashMap<>();
+  // cache result threshold, result cache is for purpose of recover paragraph even after
+  // paragraph is finished
+  private int resultCacheInSeconds;
+  private ScheduledExecutorService resultCleanService = Executors.newSingleThreadScheduledExecutor();
+
   private boolean isTest;
 
   // cluster manager client
-  ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
-  ClusterManagerClient clusterManagerClient;
+  private ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
+  private ClusterManagerClient clusterManagerClient;
 
   public RemoteInterpreterServer(String intpEventServerHost,
                                  int intpEventServerPort,
@@ -158,15 +166,12 @@ public class RemoteInterpreterServer extends Thread
                                  String interpreterGroupId,
                                  boolean isTest)
       throws TTransportException, IOException {
-    logger.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port,
+    LOGGER.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port,
             intpEventServerHost, intpEventServerPort);
     if (null != intpEventServerHost) {
       this.intpEventServerHost = intpEventServerHost;
+      this.intpEventServerPort = intpEventServerPort;
       if (!isTest) {
-        TTransport transport = new TSocket(intpEventServerHost, intpEventServerPort);
-        transport.open();
-        TProtocol protocol = new TBinaryProtocol(transport);
-        intpEventServiceClient = new RemoteInterpreterEventService.Client(protocol);
         intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort);
       }
     } else {
@@ -185,7 +190,7 @@ public class RemoteInterpreterServer extends Thread
       serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange);
       this.port = serverTransport.getServerSocket().getLocalPort();
       this.host = RemoteInterpreterUtils.findAvailableHostAddress();
-      logger.info("Launching ThriftServer at " + this.host + ":" + this.port);
+      LOGGER.info("Launching ThriftServer at " + this.host + ":" + this.port);
     }
     server = new TThreadPoolServer(
         new TThreadPoolServer.Args(serverTransport).processor(processor));
@@ -223,13 +228,15 @@ public class RemoteInterpreterServer extends Thread
             if (!interrupted) {
               RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
               try {
-                intpEventServiceClient.registerInterpreterProcess(registerInfo);
-              } catch (TException e) {
-                logger.error("Error while registering interpreter: {}", registerInfo, e);
+                LOGGER.info("Registering interpreter process");
+                intpEventClient.registerInterpreterProcess(registerInfo);
+                LOGGER.info("Registered interpreter process");
+              } catch (Exception e) {
+                LOGGER.error("Error while registering interpreter: {}", registerInfo, e);
                 try {
                   shutdown();
                 } catch (TException e1) {
-                  logger.warn("Exception occurs while shutting down", e1);
+                  LOGGER.warn("Exception occurs while shutting down", e1);
                 }
               }
             }
@@ -243,7 +250,7 @@ public class RemoteInterpreterServer extends Thread
   @Override
   public void shutdown() throws TException {
     Thread shutDownThread = new Thread(() -> {
-      logger.info("Shutting down...");
+      LOGGER.info("Shutting down...");
       // delete interpreter cluster meta
       deleteClusterMeta();
 
@@ -254,7 +261,7 @@ public class RemoteInterpreterServer extends Thread
               try {
                 interpreter.close();
               } catch (InterpreterException e) {
-                logger.warn("Fail to close interpreter", e);
+                LOGGER.warn("Fail to close interpreter", e);
               }
             }
           }
@@ -276,16 +283,16 @@ public class RemoteInterpreterServer extends Thread
         try {
           Thread.sleep(300);
         } catch (InterruptedException e) {
-          logger.info("Exception in RemoteInterpreterServer while shutdown, Thread.sleep", e);
+          LOGGER.info("Exception in RemoteInterpreterServer while shutdown, Thread.sleep", e);
         }
       }
 
       if (server.isServing()) {
-        logger.info("Force shutting down");
+        LOGGER.info("Force shutting down");
         System.exit(0);
       }
 
-      logger.info("Shutting down");
+      LOGGER.info("Shutting down");
     }, "Shutdown-Thread");
 
     shutDownThread.start();
@@ -329,7 +336,7 @@ public class RemoteInterpreterServer extends Thread
         try {
           remoteInterpreterServer.shutdown();
         } catch (TException e) {
-          logger.error("Error on shutdown RemoteInterpreterServer", e);
+          LOGGER.error("Error on shutdown RemoteInterpreterServer", e);
         }
       }
     });
@@ -368,53 +375,60 @@ public class RemoteInterpreterServer extends Thread
       clusterManagerClient.deleteClusterMeta(INTP_PROCESS_META, interpreterGroupId);
       Thread.sleep(300);
     } catch (InterruptedException e) {
-      logger.error(e.getMessage(), e);
+      LOGGER.error(e.getMessage(), e);
     }
   }
 
   @Override
   public void createInterpreter(String interpreterGroupId, String sessionId, String
       className, Map<String, String> properties, String userName) throws TException {
-    if (interpreterGroup == null) {
-      interpreterGroup = new InterpreterGroup(interpreterGroupId);
-      angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient);
-      hookRegistry = new InterpreterHookRegistry();
-      resourcePool = new DistributedResourcePool(interpreterGroup.getId(), intpEventClient);
-      interpreterGroup.setInterpreterHookRegistry(hookRegistry);
-      interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
-      interpreterGroup.setResourcePool(resourcePool);
-      intpEventClient.setIntpGroupId(interpreterGroupId);
+    try {
+      if (interpreterGroup == null) {
+        interpreterGroup = new InterpreterGroup(interpreterGroupId);
+        angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient);
+        hookRegistry = new InterpreterHookRegistry();
+        resourcePool = new DistributedResourcePool(interpreterGroup.getId(), intpEventClient);
+        interpreterGroup.setInterpreterHookRegistry(hookRegistry);
+        interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
+        interpreterGroup.setResourcePool(resourcePool);
+        intpEventClient.setIntpGroupId(interpreterGroupId);
+
+        String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
+        if (properties.containsKey("zeppelin.interpreter.output.limit")) {
+          InterpreterOutput.limit = Integer.parseInt(
+                  properties.get("zeppelin.interpreter.output.limit"));
+        }
 
-      String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
-      if (properties.containsKey("zeppelin.interpreter.output.limit")) {
-        InterpreterOutput.limit = Integer.parseInt(
-            properties.get("zeppelin.interpreter.output.limit"));
-      }
+        depLoader = new DependencyResolver(localRepoPath);
+        appLoader = new ApplicationLoader(resourcePool, depLoader);
 
-      depLoader = new DependencyResolver(localRepoPath);
-      appLoader = new ApplicationLoader(resourcePool, depLoader);
-    }
+        resultCacheInSeconds =
+                Integer.parseInt(properties.getOrDefault("zeppelin.interpreter.result.cache", "0"));
+      }
 
-    try {
-      Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);
-      Properties p = new Properties();
-      p.putAll(properties);
-      setSystemProperty(p);
-
-      Constructor<Interpreter> constructor =
-          replClass.getConstructor(new Class[]{Properties.class});
-      Interpreter repl = constructor.newInstance(p);
-      repl.setClassloaderUrls(new URL[]{});
-      logger.info("Instantiate interpreter {}", className);
-      repl.setInterpreterGroup(interpreterGroup);
-      repl.setUserName(userName);
-
-      interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(repl), sessionId);
-    } catch (ClassNotFoundException | NoSuchMethodException | SecurityException
-        | InstantiationException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      logger.error(e.toString(), e);
-      throw new TException(e);
+      try {
+        Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);
+        Properties p = new Properties();
+        p.putAll(properties);
+        setSystemProperty(p);
+
+        Constructor<Interpreter> constructor =
+                replClass.getConstructor(new Class[]{Properties.class});
+        Interpreter repl = constructor.newInstance(p);
+        repl.setClassloaderUrls(new URL[]{});
+        LOGGER.info("Instantiate interpreter {}", className);
+        repl.setInterpreterGroup(interpreterGroup);
+        repl.setUserName(userName);
+
+        interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(repl), sessionId);
+      } catch (ClassNotFoundException | NoSuchMethodException | SecurityException
+              | InstantiationException | IllegalAccessException
+              | IllegalArgumentException | InvocationTargetException e) {
+        LOGGER.error(e.toString(), e);
+        throw new TException(e);
+      }
+    } catch (Exception e) {
+      throw new TException(e.getMessage(), e);
     }
   }
 
@@ -464,7 +478,7 @@ public class RemoteInterpreterServer extends Thread
 
   @Override
   public void open(String sessionId, String className) throws TException {
-    logger.info(String.format("Open Interpreter %s for session %s ", className, sessionId));
+    LOGGER.info(String.format("Open Interpreter %s for session %s ", className, sessionId));
     Interpreter intp = getInterpreter(sessionId, className);
     try {
       intp.open();
@@ -482,12 +496,12 @@ public class RemoteInterpreterServer extends Thread
       // see NoteInterpreterLoader.SHARED_SESSION
       if (appInfo.noteId.equals(sessionId) || sessionId.equals("shared_session")) {
         try {
-          logger.info("Unload App {} ", appInfo.pkg.getName());
+          LOGGER.info("Unload App {} ", appInfo.pkg.getName());
           appInfo.app.unload();
           // see ApplicationState.Status.UNLOADED
           intpEventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED");
         } catch (ApplicationException e) {
-          logger.error(e.getMessage(), e);
+          LOGGER.error(e.getMessage(), e);
         }
       }
     }
@@ -504,7 +518,7 @@ public class RemoteInterpreterServer extends Thread
               try {
                 inp.close();
               } catch (InterpreterException e) {
-                logger.warn("Fail to close interpreter", e);
+                LOGGER.warn("Fail to close interpreter", e);
               }
               it.remove();
               break;
@@ -516,48 +530,94 @@ public class RemoteInterpreterServer extends Thread
   }
 
   @Override
-  public RemoteInterpreterResult interpret(String sessionId, String className, String st,
+  public void reconnect(String host, int port) throws TException {
+    try {
+      LOGGER.info("Reconnect to this interpreter process from {}:{}", host, port);
+      this.intpEventServerHost = host;
+      this.intpEventServerPort = port;
+      intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort);
+      intpEventClient.setIntpGroupId(interpreterGroupId);
+
+      this.angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient);
+      this.resourcePool = new DistributedResourcePool(interpreterGroup.getId(), intpEventClient);
+
+      // reset all the available InterpreterContext's components that use intpEventClient.
+      for (InterpreterContext context : InterpreterContext.getAllContexts().values()) {
+        context.setIntpEventClient(intpEventClient);
+        context.setAngularObjectRegistry(angularObjectRegistry);
+        context.setResourcePool(resourcePool);
+      }
+    } catch (Exception e) {
+      throw new TException("Fail to reconnect", e);
+    }
+  }
+
+  @Override
+  public RemoteInterpreterResult interpret(String sessionId,
+                                           String className,
+                                           String st,
                                            RemoteInterpreterContext interpreterContext)
       throws TException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("st:\n{}", st);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("st:\n{}", st);
     }
     Interpreter intp = getInterpreter(sessionId, className);
     InterpreterContext context = convert(interpreterContext);
     context.setInterpreterClassName(intp.getClassName());
 
-    Scheduler scheduler = intp.getScheduler();
-    InterpretJobListener jobListener = new InterpretJobListener();
-    InterpretJob job = new InterpretJob(
-        interpreterContext.getParagraphId(),
-        "RemoteInterpretJob_" + System.currentTimeMillis(),
-        jobListener,
-        intp,
-        st,
-        context);
-    scheduler.submit(job);
-
-    while (!job.isTerminated()) {
+    InterpretJob interpretJob = null;
+    boolean isRecover = Boolean.parseBoolean(
+            context.getLocalProperties().getOrDefault("isRecover", "false"));
+    if (isRecover) {
+      LOGGER.info("Recovering paragraph: " + context.getParagraphId() + " of note: "
+              + context.getNoteId());
+      interpretJob = runningJobs.get(context.getParagraphId());
+      if (interpretJob == null) {
+        InterpreterResult result = new InterpreterResult(Code.ERROR, "Job is finished, unable to recover it");
+        return convert(result,
+                context.getConfig(),
+                context.getGui(),
+                context.getNoteGui());
+      }
+    } else {
+      Scheduler scheduler = intp.getScheduler();
+      InterpretJobListener jobListener = new InterpretJobListener();
+      interpretJob = new InterpretJob(
+              context.getParagraphId(),
+              "RemoteInterpretJob_" + System.currentTimeMillis(),
+              jobListener,
+              intp,
+              st,
+              context);
+      runningJobs.put(context.getParagraphId(), interpretJob);
+      scheduler.submit(interpretJob);
+    }
+
+    while (!interpretJob.isTerminated()) {
+      JobListener jobListener = interpretJob.getListener();
       synchronized (jobListener) {
         try {
           jobListener.wait(1000);
         } catch (InterruptedException e) {
-          logger.info("Exception in RemoteInterpreterServer while interpret, jobListener.wait", e);
+          LOGGER.info("Exception in RemoteInterpreterServer while interpret, jobListener.wait", e);
         }
       }
     }
 
-    progressMap.remove(interpreterContext.getParagraphId());
+    progressMap.remove(context.getParagraphId());
+    resultCleanService.schedule(()-> {
+      runningJobs.remove(context.getParagraphId());
+      }, resultCacheInSeconds, TimeUnit.SECONDS);
 
-    InterpreterResult  result = (InterpreterResult) job.getReturn();
+    InterpreterResult result = interpretJob.getReturn();
     // in case of job abort in PENDING status, result can be null
     if (result == null) {
       result = new InterpreterResult(Code.KEEP_PREVIOUS_RESULT);
     }
     return convert(result,
-        context.getConfig(),
-        context.getGui(),
-        context.getNoteGui());
+            context.getConfig(),
+            context.getGui(),
+            context.getNoteGui());
   }
 
   class InterpretJobListener implements JobListener {
@@ -576,7 +636,6 @@ public class RemoteInterpreterServer extends Thread
 
   public static class InterpretJob extends Job<InterpreterResult> {
 
-
     private Interpreter interpreter;
     private String script;
     private InterpreterContext context;
@@ -657,6 +716,8 @@ public class RemoteInterpreterServer extends Thread
       ClassLoader currentThreadContextClassloader = Thread.currentThread().getContextClassLoader();
       try {
         InterpreterContext.set(context);
+        // clear the result of last run in frontend before running this paragraph.
+        context.out.clear();
 
         InterpreterResult result = null;
 
@@ -680,7 +741,7 @@ public class RemoteInterpreterServer extends Thread
           //     global_post_hook
           processInterpreterHooks(context.getNoteId());
           processInterpreterHooks(null);
-          logger.debug("Script after hooks: " + script);
+          LOGGER.debug("Script after hooks: " + script);
           result = interpreter.interpret(script, context);
         }
 
@@ -698,21 +759,21 @@ public class RemoteInterpreterServer extends Thread
         List<String> stringResult = new ArrayList<>();
         for (InterpreterResultMessage msg : resultMessages) {
           if (msg.getType() == InterpreterResult.Type.IMG) {
-            logger.debug("InterpreterResultMessage: IMAGE_DATA");
+            LOGGER.debug("InterpreterResultMessage: IMAGE_DATA");
           } else {
-            logger.debug("InterpreterResultMessage: " + msg.toString());
+            LOGGER.debug("InterpreterResultMessage: " + msg.toString());
           }
           stringResult.add(msg.getData());
         }
         // put result into resource pool
         if (context.getLocalProperties().containsKey("saveAs")) {
           if (stringResult.size() == 1) {
-            logger.info("Saving result into ResourcePool as single string: " +
+            LOGGER.info("Saving result into ResourcePool as single string: " +
                     context.getLocalProperties().get("saveAs"));
             context.getResourcePool().put(
                     context.getLocalProperties().get("saveAs"), stringResult.get(0));
           } else {
-            logger.info("Saving result into ResourcePool as string list: " +
+            LOGGER.info("Saving result into ResourcePool as string list: " +
                     context.getLocalProperties().get("saveAs"));
             context.getResourcePool().put(
                     context.getLocalProperties().get("saveAs"), stringResult);
@@ -743,7 +804,7 @@ public class RemoteInterpreterServer extends Thread
   public void cancel(String sessionId,
                      String className,
                      RemoteInterpreterContext interpreterContext) throws TException {
-    logger.info("cancel {} {}", className, interpreterContext.getParagraphId());
+    LOGGER.info("cancel {} {}", className, interpreterContext.getParagraphId());
     Interpreter intp = getInterpreter(sessionId, className);
     String jobId = interpreterContext.getParagraphId();
     Job job = intp.getScheduler().getJob(jobId);
@@ -755,7 +816,7 @@ public class RemoteInterpreterServer extends Thread
         try {
           intp.cancel(convert(interpreterContext, null));
         } catch (InterpreterException e) {
-          logger.error("Fail to cancel paragraph: " + interpreterContext.getParagraphId());
+          LOGGER.error("Fail to cancel paragraph: " + interpreterContext.getParagraphId());
         }
       });
       thread.start();
@@ -845,14 +906,14 @@ public class RemoteInterpreterServer extends Thread
           intpEventClient.onInterpreterOutputUpdateAll(
               noteId, paragraphId, out.toInterpreterResultMessage());
         } catch (IOException e) {
-          logger.error(e.getMessage(), e);
+          LOGGER.error(e.getMessage(), e);
         }
       }
 
       @Override
       public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
         String output = new String(line);
-        logger.debug("Output Append: {}", output);
+        LOGGER.debug("Output Append: {}", output);
         intpEventClient.onInterpreterOutputAppend(
             noteId, paragraphId, index, output);
       }
@@ -862,11 +923,11 @@ public class RemoteInterpreterServer extends Thread
         String output;
         try {
           output = new String(out.toByteArray());
-          logger.debug("Output Update for index {}: {}", index, output);
+          LOGGER.debug("Output Update for index {}: {}", index, output);
           intpEventClient.onInterpreterOutputUpdate(
               noteId, paragraphId, index, out.getType(), output);
         } catch (IOException e) {
-          logger.error(e.getMessage(), e);
+          LOGGER.error(e.getMessage(), e);
         }
       }
     });
@@ -932,7 +993,7 @@ public class RemoteInterpreterServer extends Thread
     // first try local objects
     AngularObject ao = registry.get(name, noteId, paragraphId);
     if (ao == null) {
-      logger.debug("Angular object {} not exists", name);
+      LOGGER.debug("Angular object {} not exists", name);
       return;
     }
 
@@ -950,7 +1011,7 @@ public class RemoteInterpreterServer extends Thread
         return;
       } catch (Exception e) {
         // it's not a previous object's type. proceed to treat as a generic type
-        logger.debug(e.getMessage(), e);
+        LOGGER.debug(e.getMessage(), e);
       }
     }
 
@@ -962,7 +1023,7 @@ public class RemoteInterpreterServer extends Thread
             }.getType());
       } catch (Exception e) {
         // it's not a generic json object, too. okay, proceed to threat as a string type
-        logger.debug(e.getMessage(), e);
+        LOGGER.debug(e.getMessage(), e);
       }
     }
 
@@ -997,7 +1058,7 @@ public class RemoteInterpreterServer extends Thread
           }.getType());
     } catch (Exception e) {
       // it's okay. proceed to treat object as a string
-      logger.debug(e.getMessage(), e);
+      LOGGER.debug(e.getMessage(), e);
     }
 
     // try string object type at last
@@ -1017,7 +1078,7 @@ public class RemoteInterpreterServer extends Thread
 
   @Override
   public List<String> resourcePoolGetAll() throws TException {
-    logger.debug("Request resourcePoolGetAll from ZeppelinServer");
+    LOGGER.debug("Request resourcePoolGetAll from ZeppelinServer");
     List<String> result = new LinkedList<>();
 
     if (resourcePool == null) {
@@ -1041,7 +1102,7 @@ public class RemoteInterpreterServer extends Thread
   @Override
   public ByteBuffer resourceGet(String noteId, String paragraphId, String resourceName)
       throws TException {
-    logger.debug("Request resourceGet {} from ZeppelinServer", resourceName);
+    LOGGER.debug("Request resourceGet {} from ZeppelinServer", resourceName);
     Resource resource = resourcePool.get(noteId, paragraphId, resourceName, false);
 
     if (resource == null || resource.get() == null || !resource.isSerializable()) {
@@ -1050,7 +1111,7 @@ public class RemoteInterpreterServer extends Thread
       try {
         return Resource.serializeObject(resource.get());
       } catch (IOException e) {
-        logger.error(e.getMessage(), e);
+        LOGGER.error(e.getMessage(), e);
         return ByteBuffer.allocate(0);
       }
     }
@@ -1099,7 +1160,7 @@ public class RemoteInterpreterServer extends Thread
           }
         }
       } catch (Exception e) {
-        logger.error(e.getMessage(), e);
+        LOGGER.error(e.getMessage(), e);
         return ByteBuffer.allocate(0);
       }
     }
@@ -1114,7 +1175,7 @@ public class RemoteInterpreterServer extends Thread
               }.getType());
       interpreterGroup.getAngularObjectRegistry().setRegistry(deserializedRegistry);
     } catch (Exception e) {
-      logger.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e);
+      LOGGER.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e);
     }
   }
 
@@ -1138,7 +1199,7 @@ public class RemoteInterpreterServer extends Thread
           intpEventClient.onAppOutputUpdate(noteId, paragraphId, index, appId,
               out.getType(), new String(out.toByteArray()));
         } catch (IOException e) {
-          logger.error(e.getMessage(), e);
+          LOGGER.error(e.getMessage(), e);
         }
       }
     });
@@ -1161,7 +1222,7 @@ public class RemoteInterpreterServer extends Thread
       String applicationInstanceId, String packageInfo, String noteId, String paragraphId)
       throws TException {
     if (runningApplications.containsKey(applicationInstanceId)) {
-      logger.warn("Application instance {} is already running");
+      LOGGER.warn("Application instance {} is already running");
       return new RemoteApplicationResult(true, "");
     }
     HeliumPackage pkgInfo = HeliumPackage.fromJson(packageInfo);
@@ -1169,7 +1230,7 @@ public class RemoteInterpreterServer extends Thread
         pkgInfo, noteId, paragraphId, applicationInstanceId);
     try {
       Application app = null;
-      logger.info(
+      LOGGER.info(
           "Loading application {}({}), artifact={}, className={} into note={}, paragraph={}",
           pkgInfo.getName(),
           applicationInstanceId,
@@ -1183,7 +1244,7 @@ public class RemoteInterpreterServer extends Thread
           new RunningApplication(pkgInfo, app, noteId, paragraphId));
       return new RemoteApplicationResult(true, "");
     } catch (Exception e) {
-      logger.error(e.getMessage(), e);
+      LOGGER.error(e.getMessage(), e);
       return new RemoteApplicationResult(false, e.getMessage());
     }
   }
@@ -1194,10 +1255,10 @@ public class RemoteInterpreterServer extends Thread
     RunningApplication runningApplication = runningApplications.remove(applicationInstanceId);
     if (runningApplication != null) {
       try {
-        logger.info("Unloading application {}", applicationInstanceId);
+        LOGGER.info("Unloading application {}", applicationInstanceId);
         runningApplication.app.unload();
       } catch (ApplicationException e) {
-        logger.error(e.getMessage(), e);
+        LOGGER.error(e.getMessage(), e);
         return new RemoteApplicationResult(false, e.getMessage());
       }
     }
@@ -1207,11 +1268,11 @@ public class RemoteInterpreterServer extends Thread
   @Override
   public RemoteApplicationResult runApplication(String applicationInstanceId)
       throws TException {
-    logger.info("run application {}", applicationInstanceId);
+    LOGGER.info("run application {}", applicationInstanceId);
 
     RunningApplication runningApp = runningApplications.get(applicationInstanceId);
     if (runningApp == null) {
-      logger.error("Application instance {} not exists", applicationInstanceId);
+      LOGGER.error("Application instance {} not exists", applicationInstanceId);
       return new RemoteApplicationResult(false, "Application instance does not exists");
     } else {
       ApplicationContext context = runningApp.app.context();
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
index 4b053d6..7ae3e8c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class AngularObjectId implements org.apache.thrift.TBase<AngularObjectId, AngularObjectId._Fields>, java.io.Serializable, Cloneable, Comparable<AngularObjectId> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AngularObjectId");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java
index 2511ab9..42e062e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class AppOutputAppendEvent implements org.apache.thrift.TBase<AppOutputAppendEvent, AppOutputAppendEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppOutputAppendEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputAppendEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java
index 8f4c9ca..87ae2dd 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class AppOutputUpdateEvent implements org.apache.thrift.TBase<AppOutputUpdateEvent, AppOutputUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppOutputUpdateEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputUpdateEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java
index 550efeb..1ffb9b8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class AppStatusUpdateEvent implements org.apache.thrift.TBase<AppStatusUpdateEvent, AppStatusUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppStatusUpdateEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppStatusUpdateEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java
index ddb5512..e138d8c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class InterpreterCompletion implements org.apache.thrift.TBase<InterpreterCompletion, InterpreterCompletion._Fields>, java.io.Serializable, Cloneable, Comparable<InterpreterCompletion> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java
index c0757fe..7e03eb4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class OutputAppendEvent implements org.apache.thrift.TBase<OutputAppendEvent, OutputAppendEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputAppendEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputAppendEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java
index 944241e..0de49fd 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class OutputUpdateAllEvent implements org.apache.thrift.TBase<OutputUpdateAllEvent, OutputUpdateAllEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputUpdateAllEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateAllEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java
index cea1774..3b46850 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class OutputUpdateEvent implements org.apache.thrift.TBase<OutputUpdateEvent, OutputUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputUpdateEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java
index 465b8bf..98f3653 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class ParagraphInfo implements org.apache.thrift.TBase<ParagraphInfo, ParagraphInfo._Fields>, java.io.Serializable, Cloneable, Comparable<ParagraphInfo> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ParagraphInfo");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java
index 8744e2b..804c859 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class RegisterInfo implements org.apache.thrift.TBase<RegisterInfo, RegisterInfo._Fields>, java.io.Serializable, Cloneable, Comparable<RegisterInfo> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RegisterInfo");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java
index d869dff..974bce8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
index ce9dff3..31aa514 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
index 2a99208..7d3031c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java
index e1eb9e8..8acc113 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class RemoteInterpreterEventService {
 
   public interface Iface {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
index 06be47a..7761c24 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
   NO_OP(1),
   ANGULAR_OBJECT_ADD(2),
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
index 2817903..5eb6ee3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java
index e0ec936..fb2642a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase<RemoteInterpreterResultMessage, RemoteInterpreterResultMessage._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResultMessage> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
index 68dc727..a472004 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class RemoteInterpreterService {
 
   public interface Iface {
@@ -35,6 +35,8 @@ public class RemoteInterpreterService {
 
     public void close(java.lang.String sessionId, java.lang.String className) throws org.apache.thrift.TException;
 
+    public void reconnect(java.lang.String host, int port) throws org.apache.thrift.TException;
+
     public RemoteInterpreterResult interpret(java.lang.String sessionId, java.lang.String className, java.lang.String st, RemoteInterpreterContext interpreterContext) throws org.apache.thrift.TException;
 
     public void cancel(java.lang.String sessionId, java.lang.String className, RemoteInterpreterContext interpreterContext) throws org.apache.thrift.TException;
@@ -81,6 +83,8 @@ public class RemoteInterpreterService {
 
     public void close(java.lang.String sessionId, java.lang.String className, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException;
 
+    public void reconnect(java.lang.String host, int port, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException;
+
     public void interpret(java.lang.String sessionId, java.lang.String className, java.lang.String st, RemoteInterpreterContext interpreterContext, org.apache.thrift.async.AsyncMethodCallback<RemoteInterpreterResult> resultHandler) throws org.apache.thrift.TException;
 
     public void cancel(java.lang.String sessionId, java.lang.String className, RemoteInterpreterContext interpreterContext, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException;
@@ -205,6 +209,27 @@ public class RemoteInterpreterService {
       return;
     }
 
+    public void reconnect(java.lang.String host, int port) throws org.apache.thrift.TException
+    {
+      send_reconnect(host, port);
+      recv_reconnect();
+    }
+
+    public void send_reconnect(java.lang.String host, int port) throws org.apache.thrift.TException
+    {
+      reconnect_args args = new reconnect_args();
+      args.setHost(host);
+      args.setPort(port);
+      sendBase("reconnect", args);
+    }
+
+    public void recv_reconnect() throws org.apache.thrift.TException
+    {
+      reconnect_result result = new reconnect_result();
+      receiveBase(result, "reconnect");
+      return;
+    }
+
     public RemoteInterpreterResult interpret(java.lang.String sessionId, java.lang.String className, java.lang.String st, RemoteInterpreterContext interpreterContext) throws org.apache.thrift.TException
     {
       send_interpret(sessionId, className, st, interpreterContext);
@@ -762,6 +787,41 @@ public class RemoteInterpreterService {
       }
     }
 
+    public void reconnect(java.lang.String host, int port, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      reconnect_call method_call = new reconnect_call(host, port, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class reconnect_call extends org.apache.thrift.async.TAsyncMethodCall<Void> {
+      private java.lang.String host;
+      private int port;
+      public reconnect_call(java.lang.String host, int port, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.host = host;
+        this.port = port;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("reconnect", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        reconnect_args args = new reconnect_args();
+        args.setHost(host);
+        args.setPort(port);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public Void getResult() throws org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new java.lang.IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return null;
+      }
+    }
+
     public void interpret(java.lang.String sessionId, java.lang.String className, java.lang.String st, RemoteInterpreterContext interpreterContext, org.apache.thrift.async.AsyncMethodCallback<RemoteInterpreterResult> resultHandler) throws org.apache.thrift.TException {
       checkReady();
       interpret_call method_call = new interpret_call(sessionId, className, st, interpreterContext, resultHandler, this, ___protocolFactory, ___transport);
@@ -1441,6 +1501,7 @@ public class RemoteInterpreterService {
       processMap.put("createInterpreter", new createInterpreter());
       processMap.put("open", new open());
       processMap.put("close", new close());
+      processMap.put("reconnect", new reconnect());
       processMap.put("interpret", new interpret());
       processMap.put("cancel", new cancel());
       processMap.put("getProgress", new getProgress());
@@ -1537,6 +1598,31 @@ public class RemoteInterpreterService {
       }
     }
 
+    public static class reconnect<I extends Iface> extends org.apache.thrift.ProcessFunction<I, reconnect_args> {
+      public reconnect() {
+        super("reconnect");
+      }
+
+      public reconnect_args getEmptyArgsInstance() {
+        return new reconnect_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      @Override
+      protected boolean rethrowUnhandledExceptions() {
+        return false;
+      }
+
+      public reconnect_result getResult(I iface, reconnect_args args) throws org.apache.thrift.TException {
+        reconnect_result result = new reconnect_result();
+        iface.reconnect(args.host, args.port);
+        return result;
+      }
+    }
+
     public static class interpret<I extends Iface> extends org.apache.thrift.ProcessFunction<I, interpret_args> {
       public interpret() {
         super("interpret");
@@ -2005,6 +2091,7 @@ public class RemoteInterpreterService {
       processMap.put("createInterpreter", new createInterpreter());
       processMap.put("open", new open());
       processMap.put("close", new close());
+      processMap.put("reconnect", new reconnect());
       processMap.put("interpret", new interpret());
       processMap.put("cancel", new cancel());
       processMap.put("getProgress", new getProgress());
@@ -2206,6 +2293,66 @@ public class RemoteInterpreterService {
       }
     }
 
+    public static class reconnect<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, reconnect_args, Void> {
+      public reconnect() {
+        super("reconnect");
+      }
+
+      public reconnect_args getEmptyArgsInstance() {
+        return new reconnect_args();
+      }
+
+      public org.apache.thrift.async.AsyncMethodCallback<Void> getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new org.apache.thrift.async.AsyncMethodCallback<Void>() { 
+          public void onComplete(Void o) {
+            reconnect_result result = new reconnect_result();
+            try {
+              fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+            } catch (org.apache.thrift.transport.TTransportException e) {
+              _LOGGER.error("TTransportException writing to internal frame buffer", e);
+              fb.close();
+            } catch (java.lang.Exception e) {
+              _LOGGER.error("Exception writing to internal frame buffer", e);
+              onError(e);
+            }
+          }
+          public void onError(java.lang.Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TSerializable msg;
+            reconnect_result result = new reconnect_result();
+            if (e instanceof org.apache.thrift.transport.TTransportException) {
+              _LOGGER.error("TTransportException inside handler", e);
+              fb.close();
+              return;
+            } else if (e instanceof org.apache.thrift.TApplicationException) {
+              _LOGGER.error("TApplicationException inside handler", e);
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TApplicationException)e;
+            } else {
+              _LOGGER.error("Exception inside handler", e);
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+            } catch (java.lang.Exception ex) {
+              _LOGGER.error("Exception writing to internal frame buffer", ex);
+              fb.close();
+            }
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, reconnect_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException {
+        iface.reconnect(args.host, args.port,resultHandler);
+      }
+    }
+
     public static class interpret<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, interpret_args, RemoteInterpreterResult> {
       public interpret() {
         super("interpret");
@@ -5847,6 +5994,727 @@ public class RemoteInterpreterService {
     }
   }
 
+  public static class reconnect_args implements org.apache.thrift.TBase<reconnect_args, reconnect_args._Fields>, java.io.Serializable, Cloneable, Comparable<reconnect_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("reconnect_args");
+
+    private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1);
+    private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2);
+
+    private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new reconnect_argsStandardSchemeFactory();
+    private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new reconnect_argsTupleSchemeFactory();
+
+    public @org.apache.thrift.annotation.Nullable java.lang.String host; // required
+    public int port; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      HOST((short)1, "host"),
+      PORT((short)2, "port");
+
+      private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+      static {
+        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      @org.apache.thrift.annotation.Nullable
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // HOST
+            return HOST;
+          case 2: // PORT
+            return PORT;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      @org.apache.thrift.annotation.Nullable
+      public static _Fields findByName(java.lang.String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final java.lang.String _fieldName;
+
+      _Fields(short thriftId, java.lang.String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public java.lang.String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    private static final int __PORT_ISSET_ID = 0;
+    private byte __isset_bitfield = 0;
+    public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(reconnect_args.class, metaDataMap);
+    }
+
+    public reconnect_args() {
+    }
+
+    public reconnect_args(
+      java.lang.String host,
+      int port)
+    {
+      this();
+      this.host = host;
+      this.port = port;
+      setPortIsSet(true);
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public reconnect_args(reconnect_args other) {
+      __isset_bitfield = other.__isset_bitfield;
+      if (other.isSetHost()) {
+        this.host = other.host;
+      }
+      this.port = other.port;
+    }
+
+    public reconnect_args deepCopy() {
+      return new reconnect_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.host = null;
+      setPortIsSet(false);
+      this.port = 0;
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public java.lang.String getHost() {
+      return this.host;
+    }
+
+    public reconnect_args setHost(@org.apache.thrift.annotation.Nullable java.lang.String host) {
+      this.host = host;
+      return this;
+    }
+
+    public void unsetHost() {
+      this.host = null;
+    }
+
+    /** Returns true if field host is set (has been assigned a value) and false otherwise */
+    public boolean isSetHost() {
+      return this.host != null;
+    }
+
+    public void setHostIsSet(boolean value) {
+      if (!value) {
+        this.host = null;
+      }
+    }
+
+    public int getPort() {
+      return this.port;
+    }
+
+    public reconnect_args setPort(int port) {
+      this.port = port;
+      setPortIsSet(true);
+      return this;
+    }
+
+    public void unsetPort() {
+      __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __PORT_ISSET_ID);
+    }
+
+    /** Returns true if field port is set (has been assigned a value) and false otherwise */
+    public boolean isSetPort() {
+      return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID);
+    }
+
+    public void setPortIsSet(boolean value) {
+      __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, value);
+    }
+
+    public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
+      switch (field) {
+      case HOST:
+        if (value == null) {
+          unsetHost();
+        } else {
+          setHost((java.lang.String)value);
+        }
+        break;
+
+      case PORT:
+        if (value == null) {
+          unsetPort();
+        } else {
+          setPort((java.lang.Integer)value);
+        }
+        break;
+
+      }
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public java.lang.Object getFieldValue(_Fields field) {
+      switch (field) {
+      case HOST:
+        return getHost();
+
+      case PORT:
+        return getPort();
+
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new java.lang.IllegalArgumentException();
+      }
+
+      switch (field) {
+      case HOST:
+        return isSetHost();
+      case PORT:
+        return isSetPort();
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(java.lang.Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof reconnect_args)
+        return this.equals((reconnect_args)that);
+      return false;
+    }
+
+    public boolean equals(reconnect_args that) {
+      if (that == null)
+        return false;
+      if (this == that)
+        return true;
+
+      boolean this_present_host = true && this.isSetHost();
+      boolean that_present_host = true && that.isSetHost();
+      if (this_present_host || that_present_host) {
+        if (!(this_present_host && that_present_host))
+          return false;
+        if (!this.host.equals(that.host))
+          return false;
+      }
+
+      boolean this_present_port = true;
+      boolean that_present_port = true;
+      if (this_present_port || that_present_port) {
+        if (!(this_present_port && that_present_port))
+          return false;
+        if (this.port != that.port)
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int hashCode = 1;
+
+      hashCode = hashCode * 8191 + ((isSetHost()) ? 131071 : 524287);
+      if (isSetHost())
+        hashCode = hashCode * 8191 + host.hashCode();
+
+      hashCode = hashCode * 8191 + port;
+
+      return hashCode;
+    }
+
+    @Override
+    public int compareTo(reconnect_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = java.lang.Boolean.valueOf(isSetHost()).compareTo(other.isSetHost());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetHost()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, other.host);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = java.lang.Boolean.valueOf(isSetPort()).compareTo(other.isSetPort());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetPort()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, other.port);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      scheme(iprot).read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      scheme(oprot).write(oprot, this);
+    }
+
+    @Override
+    public java.lang.String toString() {
+      java.lang.StringBuilder sb = new java.lang.StringBuilder("reconnect_args(");
+      boolean first = true;
+
+      sb.append("host:");
+      if (this.host == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.host);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("port:");
+      sb.append(this.port);
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+      try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bitfield = 0;
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class reconnect_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public reconnect_argsStandardScheme getScheme() {
+        return new reconnect_argsStandardScheme();
+      }
+    }
+
+    private static class reconnect_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme<reconnect_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, reconnect_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // HOST
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.host = iprot.readString();
+                struct.setHostIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 2: // PORT
+              if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+                struct.port = iprot.readI32();
+                struct.setPortIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, reconnect_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.host != null) {
+          oprot.writeFieldBegin(HOST_FIELD_DESC);
+          oprot.writeString(struct.host);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldBegin(PORT_FIELD_DESC);
+        oprot.writeI32(struct.port);
+        oprot.writeFieldEnd();
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class reconnect_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public reconnect_argsTupleScheme getScheme() {
+        return new reconnect_argsTupleScheme();
+      }
+    }
+
+    private static class reconnect_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme<reconnect_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, reconnect_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet optionals = new java.util.BitSet();
+        if (struct.isSetHost()) {
+          optionals.set(0);
+        }
+        if (struct.isSetPort()) {
+          optionals.set(1);
+        }
+        oprot.writeBitSet(optionals, 2);
+        if (struct.isSetHost()) {
+          oprot.writeString(struct.host);
+        }
+        if (struct.isSetPort()) {
+          oprot.writeI32(struct.port);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, reconnect_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet incoming = iprot.readBitSet(2);
+        if (incoming.get(0)) {
+          struct.host = iprot.readString();
+          struct.setHostIsSet(true);
+        }
+        if (incoming.get(1)) {
+          struct.port = iprot.readI32();
+          struct.setPortIsSet(true);
+        }
+      }
+    }
+
+    private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+      return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    }
+  }
+
+  public static class reconnect_result implements org.apache.thrift.TBase<reconnect_result, reconnect_result._Fields>, java.io.Serializable, Cloneable, Comparable<reconnect_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("reconnect_result");
+
+
+    private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new reconnect_resultStandardSchemeFactory();
+    private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new reconnect_resultTupleSchemeFactory();
+
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+      private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+      static {
+        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      @org.apache.thrift.annotation.Nullable
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      @org.apache.thrift.annotation.Nullable
+      public static _Fields findByName(java.lang.String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final java.lang.String _fieldName;
+
+      _Fields(short thriftId, java.lang.String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public java.lang.String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(reconnect_result.class, metaDataMap);
+    }
+
+    public reconnect_result() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public reconnect_result(reconnect_result other) {
+    }
+
+    public reconnect_result deepCopy() {
+      return new reconnect_result(this);
+    }
+
+    @Override
+    public void clear() {
+    }
+
+    public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
+      switch (field) {
+      }
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public java.lang.Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new java.lang.IllegalArgumentException();
+      }
+
+      switch (field) {
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(java.lang.Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof reconnect_result)
+        return this.equals((reconnect_result)that);
+      return false;
+    }
+
+    public boolean equals(reconnect_result that) {
+      if (that == null)
+        return false;
+      if (this == that)
+        return true;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int hashCode = 1;
+
+      return hashCode;
+    }
+
+    @Override
+    public int compareTo(reconnect_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      return 0;
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      scheme(iprot).read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      scheme(oprot).write(oprot, this);
+      }
+
+    @Override
+    public java.lang.String toString() {
+      java.lang.StringBuilder sb = new java.lang.StringBuilder("reconnect_result(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class reconnect_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public reconnect_resultStandardScheme getScheme() {
+        return new reconnect_resultStandardScheme();
+      }
+    }
+
+    private static class reconnect_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<reconnect_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, reconnect_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, reconnect_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class reconnect_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public reconnect_resultTupleScheme getScheme() {
+        return new reconnect_resultTupleScheme();
+      }
+    }
+
+    private static class reconnect_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme<reconnect_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, reconnect_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, reconnect_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      }
+    }
+
+    private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+      return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    }
+  }
+
   public static class interpret_args implements org.apache.thrift.TBase<interpret_args, interpret_args._Fields>, java.io.Serializable, Cloneable, Comparable<interpret_args>   {
     private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("interpret_args");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
index 72cae21..4a39c82 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class RunParagraphsEvent implements org.apache.thrift.TBase<RunParagraphsEvent, RunParagraphsEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RunParagraphsEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RunParagraphsEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java
index 4886182..772dd05 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09")
 public class ServiceException extends org.apache.thrift.TException implements org.apache.thrift.TBase<ServiceException, ServiceException._Fields>, java.io.Serializable, Cloneable, Comparable<ServiceException> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ServiceException");
 
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 2dd9b01..baaac22 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -93,6 +93,7 @@ service RemoteInterpreterService {
   void createInterpreter(1: string intpGroupId, 2: string sessionId, 3: string className, 4: map<string, string> properties, 5: string userName);
   void open(1: string sessionId, 2: string className);
   void close(1: string sessionId, 2: string className);
+  void reconnect(1: string host, 2: i32 port);
   RemoteInterpreterResult interpret(1: string sessionId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
   void cancel(1: string sessionId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
   i32 getProgress(1: string sessionId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index ff6d69a..f392b3d 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -60,7 +60,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
   }
 
   @Override
-  public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
+  public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
     LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
 
     this.context = context;
@@ -80,8 +80,11 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
                 context.getInterpreterSettingName(),
                 context.getInterpreterGroupId(),
                 connectTimeout,
+                context.getIntpEventServerHost(),
+                context.getIntpEventServerPort(),
                 intpTserverHost,
-                intpTserverPort);
+                intpTserverPort,
+                false);
           }
 
           @Override
@@ -152,8 +155,11 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
                 context.getInterpreterSettingName(),
                 context.getInterpreterGroupId(),
                 connectTimeout,
+                context.getIntpEventServerHost(),
+                context.getIntpEventServerPort(),
                 intpTserverHost,
-                intpTserverPort);
+                intpTserverPort,
+                false);
           }
 
           @Override
@@ -244,8 +250,8 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
 
       clusterIntpProcess = new ClusterInterpreterProcess(
           runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
-          context.getZeppelinServerRPCPort(),
-          context.getZeppelinServerHost(),
+          context.getIntpEventServerPort(),
+          context.getIntpEventServerHost(),
           zConf.getInterpreterPortRange(),
           zConf.getInterpreterDir() + "/" + intpSetGroupName,
           localRepoPath,
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
index 744e880..e7960ad 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
@@ -10,8 +10,8 @@ public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess {
 
   public ClusterInterpreterProcess(
       String intpRunner,
-      int zeppelinServerRPCPort,
-      String zeppelinServerRPCHost,
+      int intpEventServerPort,
+      String intpEventServerHost,
       String interpreterPortRange,
       String intpDir,
       String localRepoDir,
@@ -22,8 +22,8 @@ public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess {
       boolean isUserImpersonated) {
 
     super(intpRunner,
-      zeppelinServerRPCPort,
-      zeppelinServerRPCHost,
+      intpEventServerPort,
+      intpEventServerHost,
       interpreterPortRange,
       intpDir,
       localRepoDir,
diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java
index e6c9ae1..ded6885 100644
--- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java
@@ -38,7 +38,7 @@ public class DockerInterpreterLauncher extends InterpreterLauncher {
   }
 
   @Override
-  public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
+  public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
     LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
     this.context = context;
     this.properties = context.getProperties();
@@ -71,8 +71,8 @@ public class DockerInterpreterLauncher extends InterpreterLauncher {
         context.getInterpreterSettingName(),
         properties,
         env,
-        context.getZeppelinServerHost(),
-        Integer.toString(context.getZeppelinServerRPCPort()),
+        context.getIntpEventServerHost(),
+        context.getIntpEventServerPort(),
         connectTimeout);
   }
 
diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
index e096f4c..7e1bbb5 100644
--- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
@@ -77,8 +77,6 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
   private final String containerImage;
   private final Properties properties;
   private final Map<String, String> envs;
-  private final String zeppelinServiceHost;
-  private final String zeppelinServiceRpcPort;
 
   private AtomicBoolean dockerStarted = new AtomicBoolean(false);
 
@@ -117,11 +115,11 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
       String interpreterSettingName,
       Properties properties,
       Map<String, String> envs,
-      String zeppelinServiceHost,
-      String zeppelinServiceRpcPort,
+      String intpEventServerHost,
+      int intpEventServerPort,
       int connectTimeout
   ) {
-    super(connectTimeout);
+    super(connectTimeout, intpEventServerHost, intpEventServerPort);
 
     this.containerImage = containerImage;
     this.interpreterGroupId = interpreterGroupId;
@@ -129,8 +127,6 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
     this.interpreterSettingName = interpreterSettingName;
     this.properties = properties;
     this.envs = new HashMap(envs);
-    this.zeppelinServiceHost = zeppelinServiceHost;
-    this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
 
     this.zconf = zconf;
     this.containerName = interpreterGroupId.toLowerCase();
@@ -212,7 +208,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
     // Create container with exposed ports
     final ContainerConfig containerConfig = ContainerConfig.builder()
         .hostConfig(hostConfig)
-        .hostname(this.zeppelinServiceHost)
+        .hostname(this.intpEventServerHost)
         .image(containerImage)
         .workingDir("/")
         .env(listEnv)
@@ -305,8 +301,8 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
     dockerProperties.put("zeppelin.interpreter.localRepo", "/tmp/local-repo");
     dockerProperties.put("zeppelin.interpreter.rpc.portRange",
         dockerIntpServicePort + ":" + dockerIntpServicePort);
-    dockerProperties.put("zeppelin.server.rpc.host", zeppelinServiceHost);
-    dockerProperties.put("zeppelin.server.rpc.portRange", zeppelinServiceRpcPort);
+    dockerProperties.put("zeppelin.server.rpc.host", intpEventServerHost);
+    dockerProperties.put("zeppelin.server.rpc.portRange", intpEventServerPort);
 
     // interpreter properties overrides the values
     dockerProperties.putAll(Maps.fromProperties(properties));
diff --git a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
index a700333..a6863c8 100644
--- a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
@@ -90,7 +90,7 @@ public class DockerInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.hostname",
-        "12320",
+        12320,
         5000);
 
     assertEquals(intp.CONTAINER_SPARK_HOME, "my-spark-home");
@@ -116,7 +116,7 @@ public class DockerInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.hostname",
-        "12320",
+        12320,
         5000);
 
     Properties dockerProperties = intp.getTemplateBindings();
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 120500b..f07389e 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -30,8 +30,6 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
   private final String containerImage;
   private final Properties properties;
   private final Map<String, String> envs;
-  private final String zeppelinService;
-  private final String zeppelinServiceRpcPort;
 
   private final Gson gson = new Gson();
   private final String podName;
@@ -60,14 +58,14 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
           String interpreterSettingName,
           Properties properties,
           Map<String, String> envs,
-          String zeppelinService,
-          String zeppelinServiceRpcPort,
+          String intpEventServerHost,
+          int intpEventServerPort,
           boolean portForward,
           String sparkImage,
           int connectTimeout,
           boolean isUserImpersonatedForSpark
   ) {
-    super(connectTimeout);
+    super(connectTimeout, intpEventServerHost, intpEventServerPort);
     this.kubectl = kubectl;
     this.specTempaltes = specTemplates;
     this.containerImage = containerImage;
@@ -76,8 +74,6 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     this.interpreterSettingName = interpreterSettingName;
     this.properties = properties;
     this.envs = new HashMap<>(envs);
-    this.zeppelinService = zeppelinService;
-    this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
     this.portForward = portForward;
     this.sparkImage = sparkImage;
     this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6);
@@ -276,8 +272,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     k8sProperties.put("zeppelin.k8s.interpreter.setting.name", interpreterSettingName);
     k8sProperties.put("zeppelin.k8s.interpreter.localRepo", "/tmp/local-repo");
     k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", String.format("%d:%d", getPort(), getPort()));
-    k8sProperties.put("zeppelin.k8s.server.rpc.service", zeppelinService);
-    k8sProperties.put("zeppelin.k8s.server.rpc.portRange", zeppelinServiceRpcPort);
+    k8sProperties.put("zeppelin.k8s.server.rpc.service", intpEventServerHost);
+    k8sProperties.put("zeppelin.k8s.server.rpc.portRange", intpEventServerPort);
     if (ownerUID() != null && ownerName() != null) {
       k8sProperties.put("zeppelin.k8s.server.uid", ownerUID());
       k8sProperties.put("zeppelin.k8s.server.pod.name", ownerName());
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index d4b03da..d1df0f3 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -108,7 +108,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
               zConf.getK8sServiceName(),
               getNamespace());
     } else {
-      return context.getZeppelinServerHost();
+      return context.getIntpEventServerHost();
     }
   }
 
@@ -116,13 +116,13 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
    * get Zeppelin server rpc port
    * Read env variable "<HOSTNAME>_SERVICE_PORT_RPC"
    */
-  private String getZeppelinServiceRpcPort() {
+  private int getZeppelinServiceRpcPort() {
     String envServicePort = System.getenv(
             String.format("%s_SERVICE_PORT_RPC", getHostname().replaceAll("[-.]", "_").toUpperCase()));
     if (envServicePort != null) {
-      return envServicePort;
+      return Integer.parseInt(envServicePort);
     } else {
-      return Integer.toString(context.getZeppelinServerRPCPort());
+      return context.getIntpEventServerPort();
     }
   }
 
@@ -139,7 +139,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
   }
 
   @Override
-  public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
+  public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
     LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
     this.context = context;
     this.properties = context.getProperties();
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 7e1e668..3718fa0 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -51,7 +51,7 @@ public class K8sRemoteInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.hostname",
-        "12320",
+        12320,
         false,
         "spark-container:1.0",
         10,
@@ -81,7 +81,7 @@ public class K8sRemoteInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.hostname",
-        "12320",
+        12320,
         false,
         "spark-container:1.0",
         10,
@@ -116,7 +116,7 @@ public class K8sRemoteInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.service",
-        "12320",
+        12320,
         false,
         "spark-container:1.0",
         10,
@@ -136,7 +136,7 @@ public class K8sRemoteInterpreterProcessTest {
     assertEquals(true , p.containsKey("zeppelin.k8s.interpreter.localRepo"));
     assertEquals("12321:12321" , p.get("zeppelin.k8s.interpreter.rpc.portRange"));
     assertEquals("zeppelin.server.service" , p.get("zeppelin.k8s.server.rpc.service"));
-    assertEquals("12320" , p.get("zeppelin.k8s.server.rpc.portRange"));
+    assertEquals(12320 , p.get("zeppelin.k8s.server.rpc.portRange"));
     assertEquals("v1", p.get("my.key1"));
     assertEquals("V1", envs.get("MY_ENV1"));
 
@@ -169,7 +169,7 @@ public class K8sRemoteInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.service",
-        "12320",
+        12320,
         false,
         "spark-container:1.0",
         10,
@@ -222,7 +222,7 @@ public class K8sRemoteInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.service",
-        "12320",
+        12320,
         false,
         "spark-container:1.0",
         10,
@@ -274,7 +274,7 @@ public class K8sRemoteInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.service",
-        "12320",
+        12320,
         false,
         "spark-container:1.0",
         10,
@@ -315,7 +315,7 @@ public class K8sRemoteInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.service",
-        "12320",
+        12320,
         false,
         "spark-container:1.0",
         10,
@@ -360,7 +360,7 @@ public class K8sRemoteInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.service",
-        "12320",
+        12320,
         false,
         "spark-container:1.0",
         10,
@@ -397,7 +397,7 @@ public class K8sRemoteInterpreterProcessTest {
         properties,
         envs,
         "zeppelin.server.service",
-        "12320",
+        12320,
         false,
         "spark-container:1.0",
         10,
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index e04ebf2..2637e97 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -1894,12 +1894,14 @@ public class NotebookServer extends WebSocketServlet
       }
     }
 
-    if (p.isTerminated()) {
+    if (p.isTerminated() || after == Status.RUNNING) {
       if (p.getStatus() == Status.FINISHED) {
         LOG.info("Job {} is finished successfully, status: {}", p.getId(), p.getStatus());
-      } else {
+      } else if (p.isTerminated()) {
         LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}", p.getId(),
             p.getStatus(), p.getException(), p.getReturn());
+      } else {
+        LOG.info("Job {} starts to RUNNING", p.getId());
       }
 
       try {
@@ -1928,6 +1930,7 @@ public class NotebookServer extends WebSocketServlet
     try {
       Note note = getNotebook().getNote(noteId);
       note.getParagraph(paragraphId).checkpointOutput();
+      getNotebook().saveNote(note, AuthenticationInfo.ANONYMOUS);
     } catch (IOException e) {
       LOG.warn("Fail to save note: " + noteId , e);
     }
@@ -2076,6 +2079,7 @@ public class NotebookServer extends WebSocketServlet
 
           paragraph
                   .updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId());
+          getNotebook().saveNote(note, AuthenticationInfo.ANONYMOUS);
           getConnectionManager().broadcast(
                   note.getId(),
                   new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos",
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java
index e51eac3..8d9a686 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java
@@ -16,17 +16,14 @@
  */
 package org.apache.zeppelin.recovery;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
 import com.google.common.io.Files;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import java.io.File;
-import java.util.Map;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
 import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage;
 import org.apache.zeppelin.interpreter.recovery.StopInterpreter;
@@ -38,12 +35,19 @@ import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.utils.TestUtils;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.File;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
 public class RecoveryTest extends AbstractTestRestApi {
+
   private Gson gson = new Gson();
   private static File recoveryDir = null;
 
@@ -51,31 +55,29 @@ public class RecoveryTest extends AbstractTestRestApi {
 
   private AuthenticationInfo anonymous = new AuthenticationInfo("anonymous");
 
-  @BeforeClass
-  public static void init() throws Exception {
+  @Before
+  public void init() throws Exception {
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
-        FileSystemRecoveryStorage.class.getName());
+            FileSystemRecoveryStorage.class.getName());
     recoveryDir = Files.createTempDir();
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(),
             recoveryDir.getAbsolutePath());
     startUp(RecoveryTest.class.getSimpleName());
+
+    notebook = ZeppelinServer.sharedServiceLocator.getService(Notebook.class);
   }
 
-  @AfterClass
-  public static void destroy() throws Exception {
-    shutDown();
+  @After
+  public void destroy() throws Exception {
+    shutDown(true, true);
     FileUtils.deleteDirectory(recoveryDir);
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
             ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getStringValue());
   }
 
-  @Before
-  public void setUp() {
-    notebook = ZeppelinServer.sharedServiceLocator.getService(Notebook.class);
-  }
-
   @Test
   public void testRecovery() throws Exception {
+    LOG.info("Test testRecovery");
     Note note1 = null;
     try {
       note1 = notebook.createNote("note1", anonymous);
@@ -105,6 +107,9 @@ public class RecoveryTest extends AbstractTestRestApi {
       post.releaseConnection();
       assertEquals(Job.Status.FINISHED, p1.getStatus());
       assertEquals("abc\n", p1.getReturn().message().get(0).getData());
+    } catch (Exception e) {
+      LOG.error(e.toString(), e);
+      throw e;
     } finally {
       if (null != note1) {
         TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
@@ -114,6 +119,7 @@ public class RecoveryTest extends AbstractTestRestApi {
 
   @Test
   public void testRecovery_2() throws Exception {
+    LOG.info("Test testRecovery_2");
     Note note1 = null;
     try {
       note1 = notebook.createNote("note2", AuthenticationInfo.ANONYMOUS);
@@ -148,6 +154,9 @@ public class RecoveryTest extends AbstractTestRestApi {
       assertEquals(resp.get("status"), "OK");
       post.releaseConnection();
       assertEquals(Job.Status.ERROR, p1.getStatus());
+    } catch (Exception e) {
+      LOG.error(e.toString(), e);
+      throw e;
     } finally {
       if (null != note1) {
         TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
@@ -157,6 +166,7 @@ public class RecoveryTest extends AbstractTestRestApi {
 
   @Test
   public void testRecovery_3() throws Exception {
+    LOG.info("Test testRecovery_3");
     Note note1 = null;
     try {
       note1 = TestUtils.getInstance(Notebook.class).createNote("note3", AuthenticationInfo.ANONYMOUS);
@@ -188,6 +198,113 @@ public class RecoveryTest extends AbstractTestRestApi {
       assertEquals(resp.get("status"), "OK");
       post.releaseConnection();
       assertEquals(Job.Status.ERROR, p1.getStatus());
+    } catch (Exception e ) {
+      LOG.error(e.toString(), e);
+      throw e;
+    } finally {
+      if (null != note1) {
+        TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
+      }
+    }
+  }
+
+  @Test
+  public void testRecovery_Running_Paragraph_sh() throws Exception {
+    LOG.info("Test testRecovery_Running_Paragraph_sh");
+    Note note1 = null;
+    try {
+      note1 = TestUtils.getInstance(Notebook.class).createNote("note4", AuthenticationInfo.ANONYMOUS);
+
+      // run sh paragraph async, print 'hello' after 10 seconds
+      Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      p1.setText("%sh sleep 10\necho 'hello'");
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "/" + p1.getId(), "");
+      assertThat(post, isAllowed());
+      post.releaseConnection();
+      long start = System.currentTimeMillis();
+      // wait until paragraph is RUNNING
+      while((System.currentTimeMillis() - start) < 10 * 1000) {
+        if (p1.getStatus() == Job.Status.RUNNING) {
+          break;
+        }
+        Thread.sleep(1000);
+      }
+      if (p1.getStatus() != Job.Status.RUNNING) {
+        fail("Fail to run paragraph: " + p1.getReturn());
+      }
+
+      // shutdown zeppelin and restart it
+      shutDown();
+      startUp(RecoveryTest.class.getSimpleName(), false);
+
+      // wait until paragraph is finished
+      start = System.currentTimeMillis();
+      while((System.currentTimeMillis() - start) < 10 * 1000) {
+        if (p1.isTerminated()) {
+          break;
+        }
+        Thread.sleep(1000);
+      }
+
+      assertEquals(Job.Status.FINISHED, p1.getStatus());
+      assertEquals("hello\n", p1.getReturn().message().get(0).getData());
+    } catch (Exception e ) {
+      LOG.error(e.toString(), e);
+      throw e;
+    } finally {
+      if (null != note1) {
+        TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
+      }
+    }
+  }
+
+  @Test
+  public void testRecovery_Finished_Paragraph_python() throws Exception {
+    LOG.info("Test testRecovery_Finished_Paragraph_python");
+    Note note1 = null;
+    try {
+      InterpreterSettingManager interpreterSettingManager = TestUtils.getInstance(InterpreterSettingManager.class);
+      InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("python");
+      interpreterSetting.setProperty("zeppelin.python.useIPython", "false");
+      interpreterSetting.setProperty("zeppelin.interpreter.result.cache", "100");
+
+      note1 = TestUtils.getInstance(Notebook.class).createNote("note4", AuthenticationInfo.ANONYMOUS);
+
+      // run sh paragraph async, print 'hello' after 10 seconds
+      Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      p1.setText("%python import time\n" +
+              "for i in range(1, 10):\n" +
+              "    time.sleep(1)\n" +
+              "    print(i)");
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "/" + p1.getId(), "");
+      assertThat(post, isAllowed());
+      post.releaseConnection();
+
+      // wait until paragraph is running
+      while(p1.getStatus() != Job.Status.RUNNING) {
+        Thread.sleep(1000);
+      }
+
+      // shutdown zeppelin and restart it
+      shutDown();
+      // sleep 15 seconds to make sure the paragraph is finished
+      Thread.sleep(10 * 1500);
+
+      startUp(RecoveryTest.class.getSimpleName(), false);
+
+      assertEquals(Job.Status.FINISHED, p1.getStatus());
+      assertEquals("1\n" +
+              "2\n" +
+              "3\n" +
+              "4\n" +
+              "5\n" +
+              "6\n" +
+              "7\n" +
+              "8\n" +
+              "9\n", p1.getReturn().message().get(0).getData());
+    } catch (Exception e ) {
+      LOG.error(e.toString(), e);
+      throw e;
     } finally {
       if (null != note1) {
         TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 2be4d54..6552d0f 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -289,12 +289,17 @@ public abstract class AbstractTestRestApi {
   }
 
   protected static void shutDown(final boolean deleteConfDir) throws Exception {
+    shutDown(deleteConfDir, false);
+  }
+
+  protected static void shutDown(final boolean deleteConfDir,
+                                 boolean forceShutdownInterpreter) throws Exception {
 
     if (!WAS_RUNNING && TestUtils.getInstance(Notebook.class) != null) {
       // restart interpreter to stop all interpreter processes
       List<InterpreterSetting> settingList = TestUtils.getInstance(Notebook.class).getInterpreterSettingManager()
               .get();
-      if (!TestUtils.getInstance(Notebook.class).getConf().isRecoveryEnabled()) {
+      if (!TestUtils.getInstance(Notebook.class).getConf().isRecoveryEnabled() || forceShutdownInterpreter) {
         for (InterpreterSetting setting : settingList) {
           TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().restart(setting.getId());
         }
@@ -336,7 +341,6 @@ public abstract class AbstractTestRestApi {
       TestUtils.clearInstances();
       ZeppelinServer.reset();
     }
-
   }
 
   protected static boolean checkIfServerIsRunning() {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 9626181..5338bfc 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -177,12 +177,15 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
     this.angularObjectRegistryListener = angularObjectRegistryListener;
     this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
     this.appEventListener = appEventListener;
+
+    this.interpreterEventServer = new RemoteInterpreterEventServer(conf, this);
+    this.interpreterEventServer.start();
+
     this.recoveryStorage =
         ReflectionUtils.createClazzInstance(
             conf.getRecoveryStorageClass(),
             new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class},
             new Object[] {conf, this});
-    this.recoveryStorage.init();
     LOGGER.info("Using RecoveryStorage: " + this.recoveryStorage.getClass().getName());
     this.lifecycleManager =
         ReflectionUtils.createClazzInstance(
@@ -192,11 +195,13 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
     LOGGER.info("Using LifecycleManager: " + this.lifecycleManager.getClass().getName());
 
     this.configStorage = configStorage;
-    this.interpreterEventServer = new RemoteInterpreterEventServer(conf, this);
-    this.interpreterEventServer.start();
     init();
   }
 
+  public RemoteInterpreterEventServer getInterpreterEventServer() {
+    return interpreterEventServer;
+  }
+
   public void refreshInterpreterTemplates() {
     Set<String> installedInterpreters = Sets.newHashSet(interpreterSettingTemplates.keySet());
 
@@ -322,6 +327,9 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
     loadInterpreterSettingFromDefaultDir(true);
     loadFromFile();
     saveToFile();
+
+    // must init Recovery after init of InterpreterSettingManagaer
+    recoveryStorage.init();
   }
 
   private void loadJupyterKernelLanguageMap() throws IOException {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index b387475..a27677f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -40,6 +40,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
 
   private InterpreterSetting interpreterSetting;
   private RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
+  private Object interpreterProcessCreationLock = new Object();
 
   /**
    * Create InterpreterGroup with given id and interpreterSetting, used in ZeppelinServer
@@ -55,19 +56,21 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
     return interpreterSetting;
   }
 
-  public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName,
-                                                                             Properties properties)
+  public RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName,
+                                                                Properties properties)
       throws IOException {
-    if (remoteInterpreterProcess == null) {
-      LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId());
-      remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, userName,
-          properties);
-      remoteInterpreterProcess.start(userName);
-      interpreterSetting.getLifecycleManager().onInterpreterProcessStarted(this);
-      getInterpreterSetting().getRecoveryStorage()
-          .onInterpreterClientStart(remoteInterpreterProcess);
+    synchronized (interpreterProcessCreationLock) {
+      if (remoteInterpreterProcess == null) {
+        LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId());
+        remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, userName,
+                properties);
+        remoteInterpreterProcess.start(userName);
+        interpreterSetting.getLifecycleManager().onInterpreterProcessStarted(this);
+        getInterpreterSetting().getRecoveryStorage()
+                .onInterpreterClientStart(remoteInterpreterProcess);
+      }
+      return remoteInterpreterProcess;
     }
-    return remoteInterpreterProcess;
   }
 
   public RemoteInterpreterProcess getInterpreterProcess() {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
index 11ba6f9..877d045 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -166,7 +166,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
       LOGGER.warn("Interpreter process does not existed yet for InterpreterGroup: " +
           registerInfo.getInterpreterGroupId());
     }
-
+    LOGGER.info("Register interpreter process: {}:{}, {}",
+            registerInfo.getHost(), registerInfo.getPort(), registerInfo.getInterpreterGroupId());
     interpreterProcess.processStarted(registerInfo.port, registerInfo.host);
   }
 
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 617587a..51776ba 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index ff60b39..bc5034a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -46,8 +45,8 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
   }
 
   @Override
-  public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
-    LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
+  public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
+    LOGGER.info("Launching new interpreter process of " + context.getInterpreterSettingGroup());
     this.properties = context.getProperties();
     InterpreterOption option = context.getOption();
     InterpreterRunner runner = context.getRunner();
@@ -60,31 +59,18 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
           context.getInterpreterSettingName(),
           context.getInterpreterGroupId(),
           connectTimeout,
+          context.getIntpEventServerHost(),
+          context.getIntpEventServerPort(),
           option.getHost(),
-          option.getPort());
+          option.getPort(),
+          false);
     } else {
-      // try to recover it first
-      if (zConf.isRecoveryEnabled()) {
-        InterpreterClient recoveredClient =
-            recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
-        if (recoveredClient != null) {
-          if (recoveredClient.isRunning()) {
-            LOGGER.info("Recover interpreter process: " + recoveredClient.getHost() + ":" +
-                recoveredClient.getPort());
-            return recoveredClient;
-          } else {
-            LOGGER.warn("Cannot recover interpreter process: " + recoveredClient.getHost() + ":"
-                + recoveredClient.getPort() + ", as it is already terminated.");
-          }
-        }
-      }
-
       // create new remote process
       String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
           + context.getInterpreterSettingId();
       return new RemoteInterpreterManagedProcess(
           runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
-          context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), zConf.getInterpreterPortRange(),
+          context.getIntpEventServerPort(), context.getIntpEventServerHost(), zConf.getInterpreterPortRange(),
           zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
           buildEnvFromProperties(context), connectTimeout, name,
           context.getInterpreterGroupId(), option.isUserImpersonate());
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
index 1b660ac..46ac23a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
@@ -39,6 +39,8 @@ import java.util.Map;
 
 /**
  * Hadoop compatible FileSystem based RecoveryStorage implementation.
+ * All the running interpreter process info will be save into files on hdfs.
+ * Each interpreter setting will have one file.
  *
  * Save InterpreterProcess in the format of:
  * InterpreterGroupId host:port
@@ -47,16 +49,15 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemRecoveryStorage.class);
 
-  private InterpreterSettingManager interpreterSettingManager;
   private FileSystemStorage fs;
   private Path recoveryDir;
+  private InterpreterSettingManager interpreterSettingManager;
 
   public FileSystemRecoveryStorage(ZeppelinConfiguration zConf,
                                    InterpreterSettingManager interpreterSettingManager)
       throws IOException {
     super(zConf);
     this.interpreterSettingManager = interpreterSettingManager;
-    this.zConf = zConf;
     this.fs = new FileSystemStorage(zConf, zConf.getRecoveryDir());
     LOGGER.info("Creating FileSystem: " + this.fs.getFs().getClass().getName() +
         " for Zeppelin Recovery.");
@@ -79,17 +80,19 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
     InterpreterSetting interpreterSetting =
         interpreterSettingManager.getInterpreterSettingByName(interpreterSettingName);
     List<String> recoveryContent = new ArrayList<>();
-    for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) {
-      RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess();
-      if (interpreterProcess != null) {
-        recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" +
-            interpreterProcess.getPort());
+    if (interpreterSetting != null) {
+      for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) {
+        RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess();
+        if (interpreterProcess != null && interpreterProcess.isRunning()) {
+          recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" +
+                  interpreterProcess.getPort());
+        }
       }
     }
-    LOGGER.debug("Updating recovery data for interpreterSetting: " + interpreterSettingName);
-    LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, System.lineSeparator()));
+    String recoveryContentStr = StringUtils.join(recoveryContent, System.lineSeparator());
+    LOGGER.debug("Updating recovery data of {}: {}", interpreterSettingName, recoveryContentStr);
     Path recoveryFile = new Path(recoveryDir, interpreterSettingName + ".recovery");
-    fs.writeFile(StringUtils.join(recoveryContent, System.lineSeparator()), recoveryFile, true);
+    fs.writeFile(recoveryContentStr, recoveryFile, true);
   }
 
   @Override
@@ -105,16 +108,18 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
       if (!StringUtils.isBlank(recoveryContent)) {
         for (String line : recoveryContent.split(System.lineSeparator())) {
           String[] tokens = line.split("\t");
-          String groupId = tokens[0];
+          String interpreterGroupId = tokens[0];
           String[] hostPort = tokens[1].split(":");
           int connectTimeout =
-              zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+                  zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
           RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess(
-              interpreterSettingName, groupId, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
-          // interpreterSettingManager may be null when this class is used when it is used
-          // stop-interpreter.sh
-          clients.put(groupId, client);
-          LOGGER.info("Recovering Interpreter Process: " + hostPort[0] + ":" + hostPort[1]);
+              interpreterSettingName, interpreterGroupId, connectTimeout,
+                  interpreterSettingManager.getInterpreterEventServer().getHost(),
+                  interpreterSettingManager.getInterpreterEventServer().getPort(),
+                  hostPort[0], Integer.parseInt(hostPort[1]), true);
+          clients.put(interpreterGroupId, client);
+          LOGGER.info("Recovering Interpreter Process: " + interpreterGroupId + ", " +
+                  hostPort[0] + ":" + hostPort[1]);
         }
       }
     }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java
index 3a7d12c..ec6a3b0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java
@@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 
 
@@ -49,6 +50,6 @@ public class NullRecoveryStorage extends RecoveryStorage {
 
   @Override
   public Map<String, InterpreterClient> restore() throws IOException {
-    return null;
+    return new HashMap<>();
   }
 }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java
index d74b162..2eb0b04 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java
@@ -22,11 +22,12 @@ public class StopInterpreter {
 
   public static void main(String[] args) throws IOException {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
-    RecoveryStorage recoveryStorage = null;
+    InterpreterSettingManager interpreterSettingManager =
+            new InterpreterSettingManager(zConf, null, null, null);
 
-    recoveryStorage = ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(),
+    RecoveryStorage recoveryStorage  = ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(),
         new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class},
-        new Object[] {zConf, null});
+        new Object[] {zConf, interpreterSettingManager});
 
     LOGGER.info("Using RecoveryStorage: " + recoveryStorage.getClass().getName());
     Map<String, InterpreterClient> restoredClients = recoveryStorage.restore();
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index c3678c0..79130a4 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -42,8 +42,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
           Pattern.compile("Submitted application (\\w+)");
 
   private final String interpreterRunner;
-  private final int zeppelinServerRPCPort;
-  private final String zeppelinServerRPCHost;
   private final String interpreterPortRange;
   private InterpreterProcessLauncher interpreterProcessLauncher;
   private String host = null;
@@ -59,8 +57,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
 
   public RemoteInterpreterManagedProcess(
       String intpRunner,
-      int zeppelinServerRPCPort,
-      String zeppelinServerRPCHost,
+      int intpEventServerPort,
+      String intpEventServerHost,
       String interpreterPortRange,
       String intpDir,
       String localRepoDir,
@@ -69,10 +67,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
       String interpreterSettingName,
       String interpreterGroupId,
       boolean isUserImpersonated) {
-    super(connectTimeout);
+    super(connectTimeout, intpEventServerHost, intpEventServerPort);
     this.interpreterRunner = intpRunner;
-    this.zeppelinServerRPCPort = zeppelinServerRPCPort;
-    this.zeppelinServerRPCHost = zeppelinServerRPCHost;
     this.interpreterPortRange = interpreterPortRange;
     this.env = env;
     this.interpreterDir = intpDir;
@@ -99,9 +95,9 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
     cmdLine.addArgument("-d", false);
     cmdLine.addArgument(interpreterDir, false);
     cmdLine.addArgument("-c", false);
-    cmdLine.addArgument(zeppelinServerRPCHost, false);
+    cmdLine.addArgument(intpEventServerHost, false);
     cmdLine.addArgument("-p", false);
-    cmdLine.addArgument(String.valueOf(zeppelinServerRPCPort), false);
+    cmdLine.addArgument(String.valueOf(intpEventServerPort), false);
     cmdLine.addArgument("-r", false);
     cmdLine.addArgument(interpreterPortRange, false);
     cmdLine.addArgument("-i", false);
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 5d10df1..e3a81ac 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -23,6 +23,8 @@ import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -30,13 +32,20 @@ import java.io.IOException;
  * Abstract class for interpreter process
  */
 public abstract class RemoteInterpreterProcess implements InterpreterClient {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
   private static final Gson GSON = new Gson();
 
   private int connectTimeout;
+  protected String intpEventServerHost;
+  protected int intpEventServerPort;
   private PooledRemoteClient<Client> remoteClient;
 
-  public RemoteInterpreterProcess(int connectTimeout) {
+  public RemoteInterpreterProcess(int connectTimeout,
+                                  String intpEventServerHost,
+                                  int intpEventServerPort) {
     this.connectTimeout = connectTimeout;
+    this.intpEventServerHost = intpEventServerHost;
+    this.intpEventServerPort = intpEventServerPort;
     this.remoteClient = new PooledRemoteClient<Client>(() -> {
       TSocket transport = new TSocket(getHost(), getPort());
       try {
@@ -54,7 +63,6 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
   }
 
   public void shutdown() {
-    // Close client socket connection
     if (remoteClient != null) {
       remoteClient.shutdown();
     }
@@ -66,7 +74,10 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
    * @param name
    * @param o
    */
-  public void updateRemoteAngularObject(String name, String noteId, String paragraphId, Object o) {
+  public void updateRemoteAngularObject(String name,
+                                        String noteId,
+                                        String paragraphId,
+                                        Object o) {
     remoteClient.callRemoteFunction((PooledRemoteClient.RemoteFunction<Void, Client>) client -> {
        client.angularObjectUpdate(name, noteId, paragraphId, GSON.toJson(o));
        return null;
@@ -77,6 +88,21 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
     return remoteClient.callRemoteFunction(func);
   }
 
+  @Override
+  public boolean recover() {
+    try {
+      remoteClient.callRemoteFunction(client -> {
+        client.reconnect(intpEventServerHost, intpEventServerPort);
+        return null;
+      });
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Fail to recover remote interpreter process: {}" , e.getMessage());
+      return false;
+    }
+  }
+
+
   /**
    * called by RemoteInterpreterEventServer to notify that RemoteInterpreter Process is started
    */
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
index a33520a..053a72c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -30,19 +30,23 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
   private final int port;
   private final String interpreterSettingName;
   private final String interpreterGroupId;
+  private final boolean isRecovery;
 
   public RemoteInterpreterRunningProcess(
       String interpreterSettingName,
       String interpreterGroupId,
       int connectTimeout,
+      String intpEventServerHost,
+      int intpEventServerPort,
       String host,
-      int port
-  ) {
-    super(connectTimeout);
+      int port,
+      boolean isRecovery) {
+    super(connectTimeout, intpEventServerHost, intpEventServerPort);
     this.interpreterSettingName = interpreterSettingName;
     this.interpreterGroupId = interpreterGroupId;
     this.host = host;
     this.port = port;
+    this.isRecovery = isRecovery;
   }
 
   @Override
@@ -74,7 +78,7 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
   public void stop() {
     // assume process is externally managed. nothing to do. But will kill it
     // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that.
-    if (System.getenv("ZEPPELIN_FORCE_STOP") != null) {
+    if (System.getenv("ZEPPELIN_FORCE_STOP") != null || isRecovery) {
       if (isRunning()) {
         LOGGER.info("Kill interpreter process of interpreter group: {}", interpreterGroupId);
         try {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 55cb4f4..9fb8ba8 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -73,7 +73,7 @@ public class Note implements JsonSerializable {
   private static final ExclusionStrategy strategy = new ExclusionStrategy() {
     @Override
     public boolean shouldSkipField(FieldAttributes f) {
-      return f.getName().equals("runtimeInfos") || f.getName().equals("path");
+      return f.getName().equals("path");
     }
 
     @Override
@@ -1065,11 +1065,11 @@ public class Note implements JsonSerializable {
 
   public void postProcessParagraphs() {
     for (Paragraph p : paragraphs) {
-      p.cleanRuntimeInfos();
-      p.cleanOutputBuffer();
       p.parseText();
+      p.setNote(this);
+      p.setAuthenticationInfo(AuthenticationInfo.ANONYMOUS);
 
-      if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
+      if (p.getStatus() == Status.PENDING) {
         p.setStatus(Status.ABORT);
       }
 
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 387a56e..be3a5bb 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -48,6 +48,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
 import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
 import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision;
+import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.search.SearchService;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.user.Credentials;
@@ -109,6 +110,21 @@ public class Notebook {
     }
   }
 
+  private void recoverRunningParagraphs() {
+
+    Thread thread = new Thread(() -> {
+      for (Note note : getAllNotes()) {
+        for (Paragraph paragraph : note.getParagraphs()) {
+          if (paragraph.getStatus() == Job.Status.RUNNING) {
+            paragraph.recover();
+          }
+        }
+      }
+    });
+    thread.setName("Recovering-Thread");
+    thread.start();
+  }
+
   @Inject
   public Notebook(
       ZeppelinConfiguration conf,
@@ -134,6 +150,8 @@ public class Notebook {
       this.noteEventListeners.add(noteEventListener);
     }
     this.paragraphJobListener = (ParagraphJobListener) noteEventListener;
+
+    recoverRunningParagraphs();
   }
 
   public NoteManager getNoteManager() {
@@ -545,6 +563,17 @@ public class Notebook {
 
   public List<Note> getAllNotes() {
     List<Note> noteList = noteManager.getAllNotes();
+    for (Note note : noteList) {
+      note.setInterpreterFactory(replFactory);
+      note.setInterpreterSettingManager(interpreterSettingManager);
+      note.setParagraphJobListener(paragraphJobListener);
+      note.setNoteEventListeners(noteEventListeners);
+      note.setCredentials(credentials);
+      for (Paragraph p : note.getParagraphs()) {
+        p.setNote(note);
+        p.setListener(paragraphJobListener);
+      }
+    }
     Collections.sort(noteList, Comparator.comparing(Note::getPath));
     return noteList;
   }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 0086a47..3d70eb6 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -54,6 +54,7 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.scheduler.Job;
@@ -101,12 +102,11 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   // personalized
   private transient Map<String, Paragraph> userParagraphMap = new HashMap<>();
   private transient Map<String, String> localProperties = new HashMap<>();
-  // serialize runtimeInfos to frontend but not to note file (via gson's ExclusionStrategy)
+
   private Map<String, ParagraphRuntimeInfo> runtimeInfos = new HashMap<>();
   private transient List<InterpreterResultMessage> outputBuffer = new ArrayList<>();
 
 
-
   @VisibleForTesting
   Paragraph() {
     super(generateId(), null);
@@ -405,7 +405,9 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   @Override
   protected InterpreterResult jobRun() throws Throwable {
     try {
-      this.runtimeInfos.clear();
+      if (localProperties.getOrDefault("isRecover", "false").equals("false")) {
+        this.runtimeInfos.clear();
+      }
       this.interpreter = getBindedInterpreter();
       if (this.interpreter == null) {
         LOGGER.error("Can not find interpreter name " + intpText);
@@ -505,6 +507,8 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
       }
     } catch (Exception e) {
       return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
+    } finally {
+      localProperties.remove("isRecover");
     }
   }
 
@@ -794,4 +798,51 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
               outputBuffer.size() + " output in outputBuffer");
     }
   }
+
+  public void recover() {
+    try {
+      LOGGER.info("Recovering paragraph: " + getId());
+
+      this.interpreter = getBindedInterpreter();
+      InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup)
+              interpreter.getInterpreterGroup()).getInterpreterSetting();
+      Map<String, Object> config
+              = interpreterSetting.getConfig(interpreter.getClassName());
+      mergeConfig(config);
+
+      if (shouldSkipRunParagraph()) {
+        LOGGER.info("Skip to run blank paragraph. {}", getId());
+        setStatus(Job.Status.FINISHED);
+        return ;
+      }
+      setStatus(Status.READY);
+      localProperties.put("isRecover", "true");
+      for (List<Interpreter> sessions : this.interpreter.getInterpreterGroup().values()) {
+        for (Interpreter intp : sessions) {
+          // exclude ConfInterpreter
+          if (intp instanceof RemoteInterpreter) {
+            ((RemoteInterpreter) intp).setOpened(true);
+          }
+        }
+      }
+
+      if (getConfig().get("enabled") == null || (Boolean) getConfig().get("enabled")) {
+        setAuthenticationInfo(getAuthenticationInfo());
+        interpreter.getScheduler().submit(this);
+      }
+
+    } catch (InterpreterNotFoundException e) {
+      InterpreterResult intpResult =
+              new InterpreterResult(InterpreterResult.Code.ERROR,
+                      String.format("Interpreter %s not found", this.intpText));
+      setReturn(intpResult, e);
+      setStatus(Job.Status.ERROR);
+    } catch (Throwable e) {
+      InterpreterResult intpResult =
+              new InterpreterResult(InterpreterResult.Code.ERROR,
+                      "Unexpected exception: " + ExceptionUtils.getStackTrace(e));
+      setReturn(intpResult, e);
+      setStatus(Job.Status.ERROR);
+    }
+  }
 }
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
index d232c25..e73f79e 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -23,7 +23,6 @@ import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.thrift.ParagraphInfo;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index c768e54..84e7fbe 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -27,7 +27,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.interpreter.thrift.ParagraphInfo;
 import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.scheduler.Job.Status;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;