You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/08/05 06:11:17 UTC

[zeppelin] branch master updated: [ZEPPELIN-4974]. Support resume flink job from checkpoint

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new dfe3a0e  [ZEPPELIN-4974]. Support resume flink job from checkpoint
dfe3a0e is described below

commit dfe3a0e933ba71b89ddbefb37ee84715f9478940
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Jul 29 13:55:36 2020 +0800

    [ZEPPELIN-4974]. Support resume flink job from checkpoint
    
    ### What is this PR for?
    
    This PR would save flink job's latest checkpoint path to paragraph's config. And resume flink job from the latest checkpoint if the paragraph local property `resumeFromLatestCheckpoint` is `true`
    
    ### What type of PR is it?
    [ Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4974
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation?  NO
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3870 from zjffdu/ZEPPELIN-4974 and squashes the following commits:
    
    8edca4775 [Jeff Zhang] [ZEPPELIN-4974]. Support resume flink job from checkpoint
---
 .travis.yml                                        |    4 +-
 docs/interpreter/flink.md                          |   11 +-
 .../apache/zeppelin/flink/FlinkInterpreter.java    |    4 +-
 .../java/org/apache/zeppelin/flink/JobManager.java |   41 +-
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |   40 +-
 .../zeppelin/flink/FlinkInterpreterTest.java       |    4 +-
 .../flink/FlinkStreamSqlInterpreterTest.java       |   25 +-
 .../zeppelin/flink/IPyFlinkInterpreterTest.java    |    4 +-
 .../src/test/resources/log4j.properties            |    1 +
 .../integration/FlinkIntegrationTest110.java       |    1 -
 .../integration/FlinkIntegrationTest111.java       |    1 -
 .../integration/ZeppelinFlinkClusterTest.java      |  170 +++
 ...st110.java => ZeppelinFlinkClusterTest110.java} |    5 +-
 ...st111.java => ZeppelinFlinkClusterTest111.java} |    5 +-
 .../src/test/resources/init_stream.scala           |   46 +
 .../zeppelin/interpreter/InterpreterContext.java   |    5 +-
 .../remote/RemoteInterpreterEventClient.java       |   11 +
 .../interpreter/thrift/AngularObjectId.java        |    2 +-
 .../interpreter/thrift/AppOutputAppendEvent.java   |    2 +-
 .../interpreter/thrift/AppOutputUpdateEvent.java   |    2 +-
 .../interpreter/thrift/AppStatusUpdateEvent.java   |    2 +-
 .../interpreter/thrift/InterpreterCompletion.java  |    2 +-
 .../interpreter/thrift/OutputAppendEvent.java      |    2 +-
 .../interpreter/thrift/OutputUpdateAllEvent.java   |    2 +-
 .../interpreter/thrift/OutputUpdateEvent.java      |    2 +-
 .../zeppelin/interpreter/thrift/ParagraphInfo.java |    2 +-
 .../zeppelin/interpreter/thrift/RegisterInfo.java  |    2 +-
 .../thrift/RemoteApplicationResult.java            |    2 +-
 .../thrift/RemoteInterpreterContext.java           |    2 +-
 .../interpreter/thrift/RemoteInterpreterEvent.java |    2 +-
 .../thrift/RemoteInterpreterEventService.java      | 1104 +++++++++++++++++++-
 .../thrift/RemoteInterpreterEventType.java         |    2 +-
 .../thrift/RemoteInterpreterResult.java            |    2 +-
 .../thrift/RemoteInterpreterResultMessage.java     |    2 +-
 .../thrift/RemoteInterpreterService.java           |    2 +-
 .../interpreter/thrift/RunParagraphsEvent.java     |    2 +-
 .../interpreter/thrift/ServiceException.java       |    2 +-
 .../zeppelin/interpreter/thrift/WebUrlInfo.java    |    2 +-
 .../thrift/RemoteInterpreterEventService.thrift    |    1 +
 .../apache/zeppelin/service/NotebookService.java   |    8 +-
 .../apache/zeppelin/rest/AbstractTestRestApi.java  |    8 -
 .../interpreter/RemoteInterpreterEventServer.java  |   23 +-
 .../interpreter/remote/RemoteInterpreter.java      |   12 +-
 .../org/apache/zeppelin/notebook/Paragraph.java    |   16 +-
 44 files changed, 1449 insertions(+), 141 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 33ed2a9..465ead8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -98,12 +98,12 @@ jobs:
     # Test flink 1.10 & flink integration test
     - jdk: "openjdk8"
       dist: xenial
-      env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.10.1" PROFILE="-Pflink-1.10 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest110"
+      env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.10.1" PROFILE="-Pflink-1.10 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest110,ZeppelinFlinkClusterTest110"
 
     # Test flink 1.11 & flink integration test
     - jdk: "openjdk8"
       dist: xenial
-      env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.11.1" PROFILE="-Pflink-1.11 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest111"
+      env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.11.1" PROFILE="-Pflink-1.11 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest111,ZeppelinFlinkClusterTest111"
 
     # Run Spark integration test and unit test
 
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 5ae7798..3dfffa3 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -452,14 +452,19 @@ In this section, we will list and explain all the supported local properties in
     <td>Used in %flink.ssql & %flink.bsql to specify the flink sql job max parallelism in case you want to change parallelism later. For more details, refer this [link](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/parallel.html#setting-the-maximum-parallelism) </td>
   </tr>
   <tr>
-    <td>savepointDir</td>
+    <td>savePointDir</td>
     <td></td>
     <td>If you specify it, then when you cancel your flink job in Zeppelin, it would also do savepoint and store state in this directory. And when you resume your job, it would resume from this savepoint.</td>
   </tr>
   <tr>
-    <td>savepointPath</td>
+    <td>resumeFromSavePoint</td>
     <td></td>
-    <td>If you specify it, then when you resume your job, it would resume from this savepointPath .</td>
+    <td>Resume flink job from savepoint if you specify savePointDir.</td>
+  </tr>
+  <tr>
+    <td>resumeFromLatestCheckPoint</td>
+    <td></td>
+    <td>Resume flink job from latest checkpoint if you enable checkpoint.</td>
   </tr>
   <tr>
     <td>runAsOne</td>
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index d624c43..d4cf347 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -21,8 +21,6 @@ import org.apache.flink.api.scala.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.delegation.Planner;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -184,7 +182,7 @@ public class FlinkInterpreter extends Interpreter {
   }
 
   public void setSavePointIfNecessary(InterpreterContext context) {
-    this.innerIntp.setSavePointIfNecessary(context);
+    this.innerIntp.setSavePointPathIfNecessary(context);
   }
 
   public void setParallelismIfNecessary(InterpreterContext context) {
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 0c553ba..feaf7a7 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -38,6 +38,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class JobManager {
 
   private static Logger LOGGER = LoggerFactory.getLogger(JobManager.class);
+  public static final String LATEST_CHECKPOINT_PATH = "latest_checkpoint_path";
+  public static final String SAVEPOINT_PATH = "savepoint_path";
+  public static final String RESUME_FROM_SAVEPOINT = "resumeFromSavePoint";
+  public static final String RESUME_FROM_CHECKPOINT = "resumeFromLatestCheckPoint";
+  public static final String SAVEPOINT_DIR = "savePointDir";
+
 
   private Map<String, JobClient> jobs = new HashMap<>();
   private ConcurrentHashMap<JobID, FlinkJobProgressPoller> jobProgressPollerMap =
@@ -129,15 +135,18 @@ public class JobManager {
 
     boolean cancelled = false;
     try {
-      String savepointDir = context.getLocalProperties().get("savepointDir");
-      if (StringUtils.isBlank(savepointDir)) {
+      String savePointDir = context.getLocalProperties().get(SAVEPOINT_DIR);
+      if (StringUtils.isBlank(savePointDir)) {
         LOGGER.info("Trying to cancel job of paragraph {}", context.getParagraphId());
         jobClient.cancel();
       } else {
         LOGGER.info("Trying to stop job of paragraph {} with save point dir: {}",
-                context.getParagraphId(), savepointDir);
-        String savePointPath = jobClient.stopWithSavepoint(true, savepointDir).get();
-        z.angularBind(context.getParagraphId() + "_savepointpath", savePointPath);
+                context.getParagraphId(), savePointDir);
+        String savePointPath = jobClient.stopWithSavepoint(true, savePointDir).get();
+        Map<String, String> config = new HashMap<>();
+        config.put(SAVEPOINT_PATH, savePointPath);
+        context.getIntpEventClient().updateParagraphConfig(
+                context.getNoteId(), context.getParagraphId(), config);
         LOGGER.info("Job {} of paragraph {} is stopped with save point path: {}",
                 jobClient.getJobID(), context.getParagraphId(), savePointPath);
       }
@@ -230,8 +239,28 @@ public class JobManager {
                     context.getNoteId(),
                     context.getParagraphId());
           }
+
+          // fetch checkpoints info and save the latest checkpoint into paragraph's config.
+          rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString() + "/checkpoints")
+                  .asJson().getBody();
+          if (rootNode.getObject().has("latest")) {
+            JSONObject latestObject = rootNode.getObject().getJSONObject("latest");
+            if (latestObject.has("completed") && latestObject.get("completed") instanceof JSONObject) {
+              JSONObject completedObject = latestObject.getJSONObject("completed");
+              if (completedObject.has("external_path")) {
+                String checkpointPath = completedObject.getString("external_path");
+                LOGGER.debug("Latest checkpoint path: {}", checkpointPath);
+                if (!StringUtils.isBlank(checkpointPath)) {
+                  Map<String, String> config = new HashMap<>();
+                  config.put(LATEST_CHECKPOINT_PATH, checkpointPath);
+                  context.getIntpEventClient().updateParagraphConfig(
+                          context.getNoteId(), context.getParagraphId(), config);
+                }
+              }
+            }
+          }
         } catch (Exception e) {
-          LOGGER.error("Fail to poll flink job progress via rest api, rest api: " + rootNode, e);
+          LOGGER.error("Fail to poll flink job progress via rest api", e);
         }
       }
     }
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index c43ff31..81982db 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -640,31 +640,31 @@ class FlinkScalaInterpreter(val properties: Properties) {
     }
   }
 
-  def setSavePointIfNecessary(context: InterpreterContext): Unit = {
-    val savepointDir = context.getLocalProperties.get("savepointDir")
-    val savepointPath = context.getLocalProperties.get("savepointPath");
-
-    if (!StringUtils.isBlank(savepointPath)){
-      LOGGER.info("savepointPath has been setup by user , savepointPath = {}", savepointPath)
+  /**
+   * Use savepoint first, if no savepoint, then use checkpoint.
+   *
+   * @param context
+   */
+  def setSavePointPathIfNecessary(context: InterpreterContext): Unit = {
+    val savepointPath = context.getConfig.getOrDefault(JobManager.SAVEPOINT_PATH, "").toString
+    val resumeFromSavePoint = context.getBooleanLocalProperty(JobManager.RESUME_FROM_SAVEPOINT, true)
+    if (!StringUtils.isBlank(savepointPath) && resumeFromSavePoint){
+      LOGGER.info("Resume job from savepoint , savepointPath = {}", savepointPath)
       configuration.setString("execution.savepoint.path", savepointPath)
       return
-    } else if ("".equals(savepointPath)) {
-      LOGGER.info("savepointPath is empty, remove execution.savepoint.path")
-      configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
-      return;
     }
 
-    if (!StringUtils.isBlank(savepointDir)) {
-      val savepointPath = z.angular(context.getParagraphId + "_savepointpath", context.getNoteId, null)
-      if (savepointPath == null) {
-        LOGGER.info("savepointPath is null because it is the first run")
-        // remove the SAVEPOINT_PATH which may be set by last job.
-        configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH)
-      } else {
-        LOGGER.info("Set savepointPath to: " + savepointPath.toString)
-        configuration.setString("execution.savepoint.path", savepointPath.toString)
-      }
+    val checkpointPath = context.getConfig.getOrDefault(JobManager.LATEST_CHECKPOINT_PATH, "").toString
+    val resumeFromLatestCheckpoint = context.getBooleanLocalProperty(JobManager.RESUME_FROM_CHECKPOINT, false)
+    if (!StringUtils.isBlank(checkpointPath) && resumeFromLatestCheckpoint) {
+      LOGGER.info("Resume job from checkpoint , checkpointPath = {}", checkpointPath)
+      configuration.setString("execution.savepoint.path", checkpointPath)
+      return
     }
+
+    // remove the SAVEPOINT_PATH which may be set by last job.
+    LOGGER.info("Start flink job without any checkpoint and savepoint, remove execution.savepoint.path")
+    configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH)
   }
 
   def setParallelismIfNecessary(context: InterpreterContext): Unit = {
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 6136108..37f3309 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -357,7 +357,7 @@ public class FlinkInterpreterTest {
       try {
         InterpreterContext context = getInterpreterContext();
         context.getLocalProperties().put("type", "update");
-        context.getLocalProperties().put("savepointDir", savePointDir.getAbsolutePath());
+        context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
         InterpreterResult result2 = interpreter.interpret(
@@ -381,7 +381,7 @@ public class FlinkInterpreterTest {
 
     InterpreterContext context = getInterpreterContext();
     context.getLocalProperties().put("type", "update");
-    context.getLocalProperties().put("savepointDir", savePointDir.getAbsolutePath());
+    context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");
     interpreter.cancel(context);
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index f9b3581..837ec38 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -216,7 +216,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
       try {
         InterpreterContext context = getInterpreterContext();
         context.getLocalProperties().put("type", "update");
-        context.getLocalProperties().put("savepointDir", savePointDir.getAbsolutePath());
+        context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
         InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
@@ -239,7 +239,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
 
     InterpreterContext context = getInterpreterContext();
     context.getLocalProperties().put("type", "update");
-    context.getLocalProperties().put("savepointDir", savePointDir.getAbsolutePath());
+    context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");
     sqlInterpreter.cancel(context);
@@ -268,7 +268,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
       try {
         InterpreterContext context = getInterpreterContext();
         context.getLocalProperties().put("type", "update");
-        context.getLocalProperties().put("savepointDir", savePointDir.getAbsolutePath());
+        context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
         InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
@@ -289,7 +289,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
 
     InterpreterContext context = getInterpreterContext();
     context.getLocalProperties().put("type", "update");
-    context.getLocalProperties().put("savepointDir", savePointDir.getAbsolutePath());
+    context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");
     sqlInterpreter.cancel(context);
@@ -303,7 +303,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     String savepointPath = savePointDir.getAbsolutePath().concat(File.separator).concat(allSavepointPath[0]);
 
     // resume job from exist savepointPath
-    context.getLocalProperties().put("savepointPath",savepointPath);
+    context.getConfig().put(JobManager.SAVEPOINT_PATH,savepointPath);
     sqlInterpreter.interpret("select url, count(1) as pv from " +
             "log group by url", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -316,25 +316,22 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
 
   @Test
   public void testResumeStreamSqlFromInvalidSavePointPath() throws IOException, InterpreterException, InterruptedException, TimeoutException {
-    String initStreamScalaScript = getInitStreamScript(1000);
+    String initStreamScalaScript = getInitStreamScript(2000);
     InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
             getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    File savepointPath = FileUtils.getTempDirectory();
     InterpreterContext context = getInterpreterContext();
     context.getLocalProperties().put("type", "update");
-    context.getLocalProperties().put("savepointPath", savepointPath.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "1");
     context.getLocalProperties().put("maxParallelism", "10");
-    InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
-            "log group by url", context);
+    context.getConfig().put(JobManager.SAVEPOINT_PATH, "/invalid_savepoint");
 
-    // due to invalid savepointPath, failed to submit job and throw exception
-    assertEquals(InterpreterResult.Code.ERROR, result2.code());
-    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
-    assertTrue(resultMessages.toString().contains("Failed to submit job."));
+    result = sqlInterpreter.interpret("select url, count(1) as pv from " +
+            "log group by url", context);
 
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(context.out.toString(), context.out.toString().contains("Cannot find checkpoint or savepoint"));
   }
 
   @Test
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index a2d380f..096e0f7 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -426,7 +426,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
       try {
         InterpreterContext context = createInterpreterContext();
         context.getLocalProperties().put("type", "update");
-        context.getLocalProperties().put("savepointDir", savePointDir.getAbsolutePath());
+        context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
         InterpreterResult result2 = interpreter.interpret(
@@ -449,7 +449,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
 
     InterpreterContext context = createInterpreterContext();
     context.getLocalProperties().put("type", "update");
-    context.getLocalProperties().put("savepointDir", savePointDir.getAbsolutePath());
+    context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");
     interpreter.cancel(context);
diff --git a/flink/interpreter/src/test/resources/log4j.properties b/flink/interpreter/src/test/resources/log4j.properties
index fd05cc0..bb2d779 100644
--- a/flink/interpreter/src/test/resources/log4j.properties
+++ b/flink/interpreter/src/test/resources/log4j.properties
@@ -24,6 +24,7 @@ log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
 log4j.logger.org.apache.hive=WARN
 log4j.logger.org.apache.flink=WARN
 log4j.logger.org.apache.zeppelin.flink=WARN
+log4j.logger.org.apache.zeppelin.flink.JobManager=DEBUG
 log4j.logger.org.apache.zeppelin.python=WARN
 log4j.logger.org.apache.flink.streaming.api.operators.collect=ERROR
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
index 1d7bac5..b8cf293 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
@@ -29,7 +29,6 @@ public class FlinkIntegrationTest110 extends FlinkIntegrationTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-//            {"1.10.0"},
             {"1.10.1"}
     });
   }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
index b495844..108a459 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
@@ -29,7 +29,6 @@ public class FlinkIntegrationTest111 extends FlinkIntegrationTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.11.0"},
             {"1.11.1"}
     });
   }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
new file mode 100644
index 0000000..885be23
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import com.google.common.io.Files;
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.integration.DownloadUtils;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.utils.TestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public abstract class ZeppelinFlinkClusterTest extends AbstractTestRestApi {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinFlinkClusterTest.class);
+  private String flinkVersion;
+  private String flinkHome;
+
+  public ZeppelinFlinkClusterTest(String flinkVersion) throws Exception {
+    this.flinkVersion = flinkVersion;
+    LOGGER.info("Testing FlinkVersion: " + flinkVersion);
+    this.flinkHome = DownloadUtils.downloadFlink(flinkVersion);
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(),
+            "helium");
+    AbstractTestRestApi.startUp(ZeppelinFlinkClusterTest.class.getSimpleName());
+  }
+
+  @AfterClass
+  public static void destroy() throws Exception {
+    AbstractTestRestApi.shutDown();
+  }
+
+  @Test
+  public void testResumeFromCheckpoint() throws Exception {
+
+    Note note = null;
+    try {
+      // create new note
+      note = TestUtils.getInstance(Notebook.class).createNote("note1", AuthenticationInfo.ANONYMOUS);
+
+      // run p0 for %flink.conf
+      String checkpointPath = Files.createTempDir().getAbsolutePath();
+      Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      StringBuilder builder = new StringBuilder("%flink.conf\n");
+      builder.append("FLINK_HOME " + flinkHome + "\n");
+      builder.append("flink.execution.mode local\n");
+      builder.append("state.checkpoints.dir file://" + checkpointPath + "\n");
+      builder.append("execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION");
+      p0.setText(builder.toString());
+      note.run(p0.getId(), true);
+      assertEquals(Job.Status.FINISHED, p0.getStatus());
+
+      // run p1 for creating flink table via scala
+      Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      p1.setText("%flink " + getInitStreamScript(1000));
+      note.run(p1.getId(), true);
+      assertEquals(Job.Status.FINISHED, p0.getStatus());
+
+      // run p2 for flink streaming sql
+      Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      p2.setText("%flink.ssql(type=single, template=<h1>Total: {0}</h1>, resumeFromLatestCheckPoint=true)\n" +
+              "select count(1) from log;");
+      note.run(p2.getId(), false);
+      p2.waitUntilRunning();
+
+      Thread.sleep(30 * 1000);
+      TestUtils.getInstance(Notebook.class).getInterpreterSettingManager()
+              .getInterpreterSettingByName("flink").close();
+      assertTrue(p2.getConfig().toString(), p2.getConfig().get("latest_checkpoint_path").toString().contains(checkpointPath));
+
+      // run it again
+      note.run(p0.getId(), true);
+      note.run(p1.getId(), true);
+      note.run(p2.getId(), false);
+      p2.waitUntilFinished();
+      assertEquals(p2.getReturn().toString(), Job.Status.FINISHED, p2.getStatus());
+
+    } finally {
+      if (null != note) {
+        TestUtils.getInstance(Notebook.class).removeNote(note, AuthenticationInfo.ANONYMOUS);
+      }
+    }
+  }
+
+  @Test
+  public void testResumeFromInvalidCheckpoint() throws Exception {
+
+    Note note = null;
+    try {
+      // create new note
+      note = TestUtils.getInstance(Notebook.class).createNote("note2", AuthenticationInfo.ANONYMOUS);
+
+      // run p0 for %flink.conf
+      String checkpointPath = Files.createTempDir().getAbsolutePath();
+      Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      StringBuilder builder = new StringBuilder("%flink.conf\n");
+      builder.append("FLINK_HOME " + flinkHome + "\n");
+      builder.append("flink.execution.mode local\n");
+      builder.append("state.checkpoints.dir file://" + checkpointPath + "\n");
+      builder.append("execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION");
+      p0.setText(builder.toString());
+      note.run(p0.getId(), true);
+      assertEquals(Job.Status.FINISHED, p0.getStatus());
+
+      // run p1 for creating flink table via scala
+      Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      p1.setText("%flink " + getInitStreamScript(500));
+      note.run(p1.getId(), true);
+      assertEquals(Job.Status.FINISHED, p0.getStatus());
+
+      // run p2 for flink streaming sql
+      Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      p2.setText("%flink.ssql(type=single, template=<h1>Total: {0}</h1>, resumeFromLatestCheckPoint=true)\n" +
+              "select count(1) from log;");
+      p2.getConfig().put("latest_checkpoint_path", "file:///invalid_checkpoint");
+      note.run(p2.getId(), false);
+      p2.waitUntilFinished();
+      assertEquals(p2.getReturn().toString(), Job.Status.ERROR, p2.getStatus());
+      assertTrue(p2.getReturn().toString(), p2.getReturn().toString().contains("Cannot find checkpoint"));
+
+      p2.setText("%flink.ssql(type=single, template=<h1>Total: {0}</h1>, resumeFromLatestCheckPoint=false)\n" +
+              "select count(1) from log;");
+      note.run(p2.getId(), false);
+      p2.waitUntilFinished();
+      assertEquals(p2.getReturn().toString(), Job.Status.FINISHED, p2.getStatus());
+    } finally {
+      if (null != note) {
+        TestUtils.getInstance(Notebook.class).removeNote(note, AuthenticationInfo.ANONYMOUS);
+      }
+    }
+  }
+
+  public static String getInitStreamScript(int sleep_interval) throws IOException {
+    return IOUtils.toString(FlinkIntegrationTest.class.getResource("/init_stream.scala"))
+            .replace("{{sleep_interval}}", sleep_interval + "");
+  }
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
similarity index 88%
copy from zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
copy to zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
index 1d7bac5..0ec15b8 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
@@ -24,17 +24,16 @@ import java.util.Arrays;
 import java.util.List;
 
 @RunWith(value = Parameterized.class)
-public class FlinkIntegrationTest110 extends FlinkIntegrationTest {
+public class ZeppelinFlinkClusterTest110 extends ZeppelinFlinkClusterTest {
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-//            {"1.10.0"},
             {"1.10.1"}
     });
   }
 
-  public FlinkIntegrationTest110(String flinkVersion) {
+  public ZeppelinFlinkClusterTest110(String flinkVersion) throws Exception {
     super(flinkVersion);
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest111.java
similarity index 88%
copy from zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
copy to zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest111.java
index b495844..811742b 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest111.java
@@ -24,17 +24,16 @@ import java.util.Arrays;
 import java.util.List;
 
 @RunWith(value = Parameterized.class)
-public class FlinkIntegrationTest111 extends FlinkIntegrationTest {
+public class ZeppelinFlinkClusterTest111 extends ZeppelinFlinkClusterTest {
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.11.0"},
             {"1.11.1"}
     });
   }
 
-  public FlinkIntegrationTest111(String flinkVersion) {
+  public ZeppelinFlinkClusterTest111(String flinkVersion) throws Exception {
     super(flinkVersion);
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/resources/init_stream.scala b/zeppelin-interpreter-integration/src/test/resources/init_stream.scala
new file mode 100644
index 0000000..f8d27ae
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/resources/init_stream.scala
@@ -0,0 +1,46 @@
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
+import java.util.Collections
+import scala.collection.JavaConversions._
+
+senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+senv.enableCheckpointing(5000)
+
+val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] {
+
+  val pages = Seq("home", "search", "search", "product", "product", "product")
+  var count: Long = 0
+  var running : Boolean = true
+  // startTime is 2018/1/1
+  var startTime: Long = new java.util.Date(2018 - 1900,0,1).getTime
+  var sleepInterval = {{sleep_interval}}
+
+  override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = {
+    val lock = ctx.getCheckpointLock
+
+    while (count < 60 && running) {
+      lock.synchronized({
+        ctx.collect((startTime + count * sleepInterval, pages(count.toInt % pages.size)))
+        count += 1
+        Thread.sleep(sleepInterval)
+      })
+    }
+  }
+
+  override def cancel(): Unit = {
+    running = false
+  }
+
+  override def snapshotState(checkpointId: Long, timestamp: Long): java.util.List[java.lang.Long] = {
+    Collections.singletonList(count)
+  }
+
+  override def restoreState(state: java.util.List[java.lang.Long]): Unit = {
+    state.foreach(s => count = s)
+  }
+
+}).assignAscendingTimestamps(_._1)
+
+stenv.registerDataStream("log", data, 'time, 'url, 'rowtime.rowtime)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 2c20806..0350aa8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.interpreter;
 
+import com.google.common.collect.Maps;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
@@ -122,7 +123,9 @@ public class InterpreterContext {
     }
 
     public Builder setConfig(Map<String, Object> config) {
-      context.config = config;
+      if (config != null) {
+        context.config = Maps.newHashMap(config);
+      }
       return this;
     }
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index e6dab3e..2fb0733 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -386,4 +386,15 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
       LOGGER.warn("Fail to remove AngularObject", e);
     }
   }
+
+  public void updateParagraphConfig(String noteId, String paragraphId, Map<String, String> config) {
+    try {
+      callRemoteFunction(client -> {
+        client.updateParagraphConfig(noteId, paragraphId, config);
+        return null;
+      });
+    } catch (Exception e) {
+      LOGGER.warn("Fail to updateParagraphConfig", e);
+    }
+  }
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
index 6e5b452..960204d 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class AngularObjectId implements org.apache.thrift.TBase<AngularObjectId, AngularObjectId._Fields>, java.io.Serializable, Cloneable, Comparable<AngularObjectId> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AngularObjectId");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java
index e9cee37..e245312 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class AppOutputAppendEvent implements org.apache.thrift.TBase<AppOutputAppendEvent, AppOutputAppendEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppOutputAppendEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputAppendEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java
index 55e410b..fb6cf9e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class AppOutputUpdateEvent implements org.apache.thrift.TBase<AppOutputUpdateEvent, AppOutputUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppOutputUpdateEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputUpdateEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java
index 5f612a0..a25cc3b 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class AppStatusUpdateEvent implements org.apache.thrift.TBase<AppStatusUpdateEvent, AppStatusUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppStatusUpdateEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppStatusUpdateEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java
index ccbf0c3..8ad9da1 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class InterpreterCompletion implements org.apache.thrift.TBase<InterpreterCompletion, InterpreterCompletion._Fields>, java.io.Serializable, Cloneable, Comparable<InterpreterCompletion> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java
index d540c26..a7db8e8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class OutputAppendEvent implements org.apache.thrift.TBase<OutputAppendEvent, OutputAppendEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputAppendEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputAppendEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java
index 75242d9..6d2238f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class OutputUpdateAllEvent implements org.apache.thrift.TBase<OutputUpdateAllEvent, OutputUpdateAllEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputUpdateAllEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateAllEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java
index 054f4ee..f53470c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class OutputUpdateEvent implements org.apache.thrift.TBase<OutputUpdateEvent, OutputUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputUpdateEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java
index 17514b0..804951c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class ParagraphInfo implements org.apache.thrift.TBase<ParagraphInfo, ParagraphInfo._Fields>, java.io.Serializable, Cloneable, Comparable<ParagraphInfo> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ParagraphInfo");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java
index dd6f4b2..d2d19c9 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class RegisterInfo implements org.apache.thrift.TBase<RegisterInfo, RegisterInfo._Fields>, java.io.Serializable, Cloneable, Comparable<RegisterInfo> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RegisterInfo");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java
index 4669fe2..5e5434a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
index d15be40..a39e099 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
index 8e46bed..c7153c3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java
index 52263a1..e59f9f0 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class RemoteInterpreterEventService {
 
   public interface Iface {
@@ -57,6 +57,8 @@ public class RemoteInterpreterEventService {
 
     public void sendParagraphInfo(java.lang.String intpGroupId, java.lang.String json) throws org.apache.thrift.TException;
 
+    public void updateParagraphConfig(java.lang.String noteId, java.lang.String paragraphId, java.util.Map<java.lang.String,java.lang.String> config) throws org.apache.thrift.TException;
+
     public java.util.List<java.lang.String> getAllResources(java.lang.String intpGroupId) throws org.apache.thrift.TException;
 
     public java.nio.ByteBuffer getResource(java.lang.String resourceIdJson) throws org.apache.thrift.TException;
@@ -97,6 +99,8 @@ public class RemoteInterpreterEventService {
 
     public void sendParagraphInfo(java.lang.String intpGroupId, java.lang.String json, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException;
 
+    public void updateParagraphConfig(java.lang.String noteId, java.lang.String paragraphId, java.util.Map<java.lang.String,java.lang.String> config, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException;
+
     public void getAllResources(java.lang.String intpGroupId, org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>> resultHandler) throws org.apache.thrift.TException;
 
     public void getResource(java.lang.String resourceIdJson, org.apache.thrift.async.AsyncMethodCallback<java.nio.ByteBuffer> resultHandler) throws org.apache.thrift.TException;
@@ -414,6 +418,28 @@ public class RemoteInterpreterEventService {
       return;
     }
 
+    public void updateParagraphConfig(java.lang.String noteId, java.lang.String paragraphId, java.util.Map<java.lang.String,java.lang.String> config) throws org.apache.thrift.TException
+    {
+      send_updateParagraphConfig(noteId, paragraphId, config);
+      recv_updateParagraphConfig();
+    }
+
+    public void send_updateParagraphConfig(java.lang.String noteId, java.lang.String paragraphId, java.util.Map<java.lang.String,java.lang.String> config) throws org.apache.thrift.TException
+    {
+      updateParagraphConfig_args args = new updateParagraphConfig_args();
+      args.setNoteId(noteId);
+      args.setParagraphId(paragraphId);
+      args.setConfig(config);
+      sendBase("updateParagraphConfig", args);
+    }
+
+    public void recv_updateParagraphConfig() throws org.apache.thrift.TException
+    {
+      updateParagraphConfig_result result = new updateParagraphConfig_result();
+      receiveBase(result, "updateParagraphConfig");
+      return;
+    }
+
     public java.util.List<java.lang.String> getAllResources(java.lang.String intpGroupId) throws org.apache.thrift.TException
     {
       send_getAllResources(intpGroupId);
@@ -998,6 +1024,44 @@ public class RemoteInterpreterEventService {
       }
     }
 
+    public void updateParagraphConfig(java.lang.String noteId, java.lang.String paragraphId, java.util.Map<java.lang.String,java.lang.String> config, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      updateParagraphConfig_call method_call = new updateParagraphConfig_call(noteId, paragraphId, config, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class updateParagraphConfig_call extends org.apache.thrift.async.TAsyncMethodCall<Void> {
+      private java.lang.String noteId;
+      private java.lang.String paragraphId;
+      private java.util.Map<java.lang.String,java.lang.String> config;
+      public updateParagraphConfig_call(java.lang.String noteId, java.lang.String paragraphId, java.util.Map<java.lang.String,java.lang.String> config, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.noteId = noteId;
+        this.paragraphId = paragraphId;
+        this.config = config;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("updateParagraphConfig", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        updateParagraphConfig_args args = new updateParagraphConfig_args();
+        args.setNoteId(noteId);
+        args.setParagraphId(paragraphId);
+        args.setConfig(config);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public Void getResult() throws org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new java.lang.IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return null;
+      }
+    }
+
     public void getAllResources(java.lang.String intpGroupId, org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>> resultHandler) throws org.apache.thrift.TException {
       checkReady();
       getAllResources_call method_call = new getAllResources_call(intpGroupId, resultHandler, this, ___protocolFactory, ___transport);
@@ -1159,6 +1223,7 @@ public class RemoteInterpreterEventService {
       processMap.put("removeAngularObject", new removeAngularObject());
       processMap.put("sendWebUrl", new sendWebUrl());
       processMap.put("sendParagraphInfo", new sendParagraphInfo());
+      processMap.put("updateParagraphConfig", new updateParagraphConfig());
       processMap.put("getAllResources", new getAllResources());
       processMap.put("getResource", new getResource());
       processMap.put("invokeMethod", new invokeMethod());
@@ -1516,6 +1581,31 @@ public class RemoteInterpreterEventService {
       }
     }
 
+    public static class updateParagraphConfig<I extends Iface> extends org.apache.thrift.ProcessFunction<I, updateParagraphConfig_args> {
+      public updateParagraphConfig() {
+        super("updateParagraphConfig");
+      }
+
+      public updateParagraphConfig_args getEmptyArgsInstance() {
+        return new updateParagraphConfig_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      @Override
+      protected boolean rethrowUnhandledExceptions() {
+        return false;
+      }
+
+      public updateParagraphConfig_result getResult(I iface, updateParagraphConfig_args args) throws org.apache.thrift.TException {
+        updateParagraphConfig_result result = new updateParagraphConfig_result();
+        iface.updateParagraphConfig(args.noteId, args.paragraphId, args.config);
+        return result;
+      }
+    }
+
     public static class getAllResources<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getAllResources_args> {
       public getAllResources() {
         super("getAllResources");
@@ -1647,6 +1737,7 @@ public class RemoteInterpreterEventService {
       processMap.put("removeAngularObject", new removeAngularObject());
       processMap.put("sendWebUrl", new sendWebUrl());
       processMap.put("sendParagraphInfo", new sendParagraphInfo());
+      processMap.put("updateParagraphConfig", new updateParagraphConfig());
       processMap.put("getAllResources", new getAllResources());
       processMap.put("getResource", new getResource());
       processMap.put("invokeMethod", new invokeMethod());
@@ -2494,6 +2585,66 @@ public class RemoteInterpreterEventService {
       }
     }
 
+    public static class updateParagraphConfig<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, updateParagraphConfig_args, Void> {
+      public updateParagraphConfig() {
+        super("updateParagraphConfig");
+      }
+
+      public updateParagraphConfig_args getEmptyArgsInstance() {
+        return new updateParagraphConfig_args();
+      }
+
+      public org.apache.thrift.async.AsyncMethodCallback<Void> getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new org.apache.thrift.async.AsyncMethodCallback<Void>() { 
+          public void onComplete(Void o) {
+            updateParagraphConfig_result result = new updateParagraphConfig_result();
+            try {
+              fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+            } catch (org.apache.thrift.transport.TTransportException e) {
+              _LOGGER.error("TTransportException writing to internal frame buffer", e);
+              fb.close();
+            } catch (java.lang.Exception e) {
+              _LOGGER.error("Exception writing to internal frame buffer", e);
+              onError(e);
+            }
+          }
+          public void onError(java.lang.Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TSerializable msg;
+            updateParagraphConfig_result result = new updateParagraphConfig_result();
+            if (e instanceof org.apache.thrift.transport.TTransportException) {
+              _LOGGER.error("TTransportException inside handler", e);
+              fb.close();
+              return;
+            } else if (e instanceof org.apache.thrift.TApplicationException) {
+              _LOGGER.error("TApplicationException inside handler", e);
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TApplicationException)e;
+            } else {
+              _LOGGER.error("Exception inside handler", e);
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+            } catch (java.lang.Exception ex) {
+              _LOGGER.error("Exception writing to internal frame buffer", ex);
+              fb.close();
+            }
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, updateParagraphConfig_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException {
+        iface.updateParagraphConfig(args.noteId, args.paragraphId, args.config,resultHandler);
+      }
+    }
+
     public static class getAllResources<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getAllResources_args, java.util.List<java.lang.String>> {
       public getAllResources() {
         super("getAllResources");
@@ -12218,6 +12369,889 @@ public class RemoteInterpreterEventService {
     }
   }
 
+  public static class updateParagraphConfig_args implements org.apache.thrift.TBase<updateParagraphConfig_args, updateParagraphConfig_args._Fields>, java.io.Serializable, Cloneable, Comparable<updateParagraphConfig_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("updateParagraphConfig_args");
+
+    private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1);
+    private static final org.apache.thrift.protocol.TField PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphId", org.apache.thrift.protocol.TType.STRING, (short)2);
+    private static final org.apache.thrift.protocol.TField CONFIG_FIELD_DESC = new org.apache.thrift.protocol.TField("config", org.apache.thrift.protocol.TType.MAP, (short)3);
+
+    private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new updateParagraphConfig_argsStandardSchemeFactory();
+    private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new updateParagraphConfig_argsTupleSchemeFactory();
+
+    public @org.apache.thrift.annotation.Nullable java.lang.String noteId; // required
+    public @org.apache.thrift.annotation.Nullable java.lang.String paragraphId; // required
+    public @org.apache.thrift.annotation.Nullable java.util.Map<java.lang.String,java.lang.String> config; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      NOTE_ID((short)1, "noteId"),
+      PARAGRAPH_ID((short)2, "paragraphId"),
+      CONFIG((short)3, "config");
+
+      private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+      static {
+        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      @org.apache.thrift.annotation.Nullable
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // NOTE_ID
+            return NOTE_ID;
+          case 2: // PARAGRAPH_ID
+            return PARAGRAPH_ID;
+          case 3: // CONFIG
+            return CONFIG;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      @org.apache.thrift.annotation.Nullable
+      public static _Fields findByName(java.lang.String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final java.lang.String _fieldName;
+
+      _Fields(short thriftId, java.lang.String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public java.lang.String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      tmpMap.put(_Fields.PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("paragraphId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      tmpMap.put(_Fields.CONFIG, new org.apache.thrift.meta_data.FieldMetaData("config", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
+              new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), 
+              new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(updateParagraphConfig_args.class, metaDataMap);
+    }
+
+    public updateParagraphConfig_args() {
+    }
+
+    public updateParagraphConfig_args(
+      java.lang.String noteId,
+      java.lang.String paragraphId,
+      java.util.Map<java.lang.String,java.lang.String> config)
+    {
+      this();
+      this.noteId = noteId;
+      this.paragraphId = paragraphId;
+      this.config = config;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public updateParagraphConfig_args(updateParagraphConfig_args other) {
+      if (other.isSetNoteId()) {
+        this.noteId = other.noteId;
+      }
+      if (other.isSetParagraphId()) {
+        this.paragraphId = other.paragraphId;
+      }
+      if (other.isSetConfig()) {
+        java.util.Map<java.lang.String,java.lang.String> __this__config = new java.util.HashMap<java.lang.String,java.lang.String>(other.config);
+        this.config = __this__config;
+      }
+    }
+
+    public updateParagraphConfig_args deepCopy() {
+      return new updateParagraphConfig_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.noteId = null;
+      this.paragraphId = null;
+      this.config = null;
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public java.lang.String getNoteId() {
+      return this.noteId;
+    }
+
+    public updateParagraphConfig_args setNoteId(@org.apache.thrift.annotation.Nullable java.lang.String noteId) {
+      this.noteId = noteId;
+      return this;
+    }
+
+    public void unsetNoteId() {
+      this.noteId = null;
+    }
+
+    /** Returns true if field noteId is set (has been assigned a value) and false otherwise */
+    public boolean isSetNoteId() {
+      return this.noteId != null;
+    }
+
+    public void setNoteIdIsSet(boolean value) {
+      if (!value) {
+        this.noteId = null;
+      }
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public java.lang.String getParagraphId() {
+      return this.paragraphId;
+    }
+
+    public updateParagraphConfig_args setParagraphId(@org.apache.thrift.annotation.Nullable java.lang.String paragraphId) {
+      this.paragraphId = paragraphId;
+      return this;
+    }
+
+    public void unsetParagraphId() {
+      this.paragraphId = null;
+    }
+
+    /** Returns true if field paragraphId is set (has been assigned a value) and false otherwise */
+    public boolean isSetParagraphId() {
+      return this.paragraphId != null;
+    }
+
+    public void setParagraphIdIsSet(boolean value) {
+      if (!value) {
+        this.paragraphId = null;
+      }
+    }
+
+    public int getConfigSize() {
+      return (this.config == null) ? 0 : this.config.size();
+    }
+
+    public void putToConfig(java.lang.String key, java.lang.String val) {
+      if (this.config == null) {
+        this.config = new java.util.HashMap<java.lang.String,java.lang.String>();
+      }
+      this.config.put(key, val);
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public java.util.Map<java.lang.String,java.lang.String> getConfig() {
+      return this.config;
+    }
+
+    public updateParagraphConfig_args setConfig(@org.apache.thrift.annotation.Nullable java.util.Map<java.lang.String,java.lang.String> config) {
+      this.config = config;
+      return this;
+    }
+
+    public void unsetConfig() {
+      this.config = null;
+    }
+
+    /** Returns true if field config is set (has been assigned a value) and false otherwise */
+    public boolean isSetConfig() {
+      return this.config != null;
+    }
+
+    public void setConfigIsSet(boolean value) {
+      if (!value) {
+        this.config = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
+      switch (field) {
+      case NOTE_ID:
+        if (value == null) {
+          unsetNoteId();
+        } else {
+          setNoteId((java.lang.String)value);
+        }
+        break;
+
+      case PARAGRAPH_ID:
+        if (value == null) {
+          unsetParagraphId();
+        } else {
+          setParagraphId((java.lang.String)value);
+        }
+        break;
+
+      case CONFIG:
+        if (value == null) {
+          unsetConfig();
+        } else {
+          setConfig((java.util.Map<java.lang.String,java.lang.String>)value);
+        }
+        break;
+
+      }
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public java.lang.Object getFieldValue(_Fields field) {
+      switch (field) {
+      case NOTE_ID:
+        return getNoteId();
+
+      case PARAGRAPH_ID:
+        return getParagraphId();
+
+      case CONFIG:
+        return getConfig();
+
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new java.lang.IllegalArgumentException();
+      }
+
+      switch (field) {
+      case NOTE_ID:
+        return isSetNoteId();
+      case PARAGRAPH_ID:
+        return isSetParagraphId();
+      case CONFIG:
+        return isSetConfig();
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(java.lang.Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof updateParagraphConfig_args)
+        return this.equals((updateParagraphConfig_args)that);
+      return false;
+    }
+
+    public boolean equals(updateParagraphConfig_args that) {
+      if (that == null)
+        return false;
+      if (this == that)
+        return true;
+
+      boolean this_present_noteId = true && this.isSetNoteId();
+      boolean that_present_noteId = true && that.isSetNoteId();
+      if (this_present_noteId || that_present_noteId) {
+        if (!(this_present_noteId && that_present_noteId))
+          return false;
+        if (!this.noteId.equals(that.noteId))
+          return false;
+      }
+
+      boolean this_present_paragraphId = true && this.isSetParagraphId();
+      boolean that_present_paragraphId = true && that.isSetParagraphId();
+      if (this_present_paragraphId || that_present_paragraphId) {
+        if (!(this_present_paragraphId && that_present_paragraphId))
+          return false;
+        if (!this.paragraphId.equals(that.paragraphId))
+          return false;
+      }
+
+      boolean this_present_config = true && this.isSetConfig();
+      boolean that_present_config = true && that.isSetConfig();
+      if (this_present_config || that_present_config) {
+        if (!(this_present_config && that_present_config))
+          return false;
+        if (!this.config.equals(that.config))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int hashCode = 1;
+
+      hashCode = hashCode * 8191 + ((isSetNoteId()) ? 131071 : 524287);
+      if (isSetNoteId())
+        hashCode = hashCode * 8191 + noteId.hashCode();
+
+      hashCode = hashCode * 8191 + ((isSetParagraphId()) ? 131071 : 524287);
+      if (isSetParagraphId())
+        hashCode = hashCode * 8191 + paragraphId.hashCode();
+
+      hashCode = hashCode * 8191 + ((isSetConfig()) ? 131071 : 524287);
+      if (isSetConfig())
+        hashCode = hashCode * 8191 + config.hashCode();
+
+      return hashCode;
+    }
+
+    @Override
+    public int compareTo(updateParagraphConfig_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = java.lang.Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetNoteId()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = java.lang.Boolean.valueOf(isSetParagraphId()).compareTo(other.isSetParagraphId());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetParagraphId()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paragraphId, other.paragraphId);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = java.lang.Boolean.valueOf(isSetConfig()).compareTo(other.isSetConfig());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetConfig()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.config, other.config);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      scheme(iprot).read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      scheme(oprot).write(oprot, this);
+    }
+
+    @Override
+    public java.lang.String toString() {
+      java.lang.StringBuilder sb = new java.lang.StringBuilder("updateParagraphConfig_args(");
+      boolean first = true;
+
+      sb.append("noteId:");
+      if (this.noteId == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.noteId);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("paragraphId:");
+      if (this.paragraphId == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.paragraphId);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("config:");
+      if (this.config == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.config);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class updateParagraphConfig_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public updateParagraphConfig_argsStandardScheme getScheme() {
+        return new updateParagraphConfig_argsStandardScheme();
+      }
+    }
+
+    private static class updateParagraphConfig_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme<updateParagraphConfig_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, updateParagraphConfig_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // NOTE_ID
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.noteId = iprot.readString();
+                struct.setNoteIdIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 2: // PARAGRAPH_ID
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.paragraphId = iprot.readString();
+                struct.setParagraphIdIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 3: // CONFIG
+              if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
+                {
+                  org.apache.thrift.protocol.TMap _map24 = iprot.readMapBegin();
+                  struct.config = new java.util.HashMap<java.lang.String,java.lang.String>(2*_map24.size);
+                  @org.apache.thrift.annotation.Nullable java.lang.String _key25;
+                  @org.apache.thrift.annotation.Nullable java.lang.String _val26;
+                  for (int _i27 = 0; _i27 < _map24.size; ++_i27)
+                  {
+                    _key25 = iprot.readString();
+                    _val26 = iprot.readString();
+                    struct.config.put(_key25, _val26);
+                  }
+                  iprot.readMapEnd();
+                }
+                struct.setConfigIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, updateParagraphConfig_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.noteId != null) {
+          oprot.writeFieldBegin(NOTE_ID_FIELD_DESC);
+          oprot.writeString(struct.noteId);
+          oprot.writeFieldEnd();
+        }
+        if (struct.paragraphId != null) {
+          oprot.writeFieldBegin(PARAGRAPH_ID_FIELD_DESC);
+          oprot.writeString(struct.paragraphId);
+          oprot.writeFieldEnd();
+        }
+        if (struct.config != null) {
+          oprot.writeFieldBegin(CONFIG_FIELD_DESC);
+          {
+            oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.config.size()));
+            for (java.util.Map.Entry<java.lang.String, java.lang.String> _iter28 : struct.config.entrySet())
+            {
+              oprot.writeString(_iter28.getKey());
+              oprot.writeString(_iter28.getValue());
+            }
+            oprot.writeMapEnd();
+          }
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class updateParagraphConfig_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public updateParagraphConfig_argsTupleScheme getScheme() {
+        return new updateParagraphConfig_argsTupleScheme();
+      }
+    }
+
+    private static class updateParagraphConfig_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme<updateParagraphConfig_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, updateParagraphConfig_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet optionals = new java.util.BitSet();
+        if (struct.isSetNoteId()) {
+          optionals.set(0);
+        }
+        if (struct.isSetParagraphId()) {
+          optionals.set(1);
+        }
+        if (struct.isSetConfig()) {
+          optionals.set(2);
+        }
+        oprot.writeBitSet(optionals, 3);
+        if (struct.isSetNoteId()) {
+          oprot.writeString(struct.noteId);
+        }
+        if (struct.isSetParagraphId()) {
+          oprot.writeString(struct.paragraphId);
+        }
+        if (struct.isSetConfig()) {
+          {
+            oprot.writeI32(struct.config.size());
+            for (java.util.Map.Entry<java.lang.String, java.lang.String> _iter29 : struct.config.entrySet())
+            {
+              oprot.writeString(_iter29.getKey());
+              oprot.writeString(_iter29.getValue());
+            }
+          }
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, updateParagraphConfig_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet incoming = iprot.readBitSet(3);
+        if (incoming.get(0)) {
+          struct.noteId = iprot.readString();
+          struct.setNoteIdIsSet(true);
+        }
+        if (incoming.get(1)) {
+          struct.paragraphId = iprot.readString();
+          struct.setParagraphIdIsSet(true);
+        }
+        if (incoming.get(2)) {
+          {
+            org.apache.thrift.protocol.TMap _map30 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+            struct.config = new java.util.HashMap<java.lang.String,java.lang.String>(2*_map30.size);
+            @org.apache.thrift.annotation.Nullable java.lang.String _key31;
+            @org.apache.thrift.annotation.Nullable java.lang.String _val32;
+            for (int _i33 = 0; _i33 < _map30.size; ++_i33)
+            {
+              _key31 = iprot.readString();
+              _val32 = iprot.readString();
+              struct.config.put(_key31, _val32);
+            }
+          }
+          struct.setConfigIsSet(true);
+        }
+      }
+    }
+
+    private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+      return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    }
+  }
+
+  public static class updateParagraphConfig_result implements org.apache.thrift.TBase<updateParagraphConfig_result, updateParagraphConfig_result._Fields>, java.io.Serializable, Cloneable, Comparable<updateParagraphConfig_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("updateParagraphConfig_result");
+
+
+    private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new updateParagraphConfig_resultStandardSchemeFactory();
+    private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new updateParagraphConfig_resultTupleSchemeFactory();
+
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+      private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+      static {
+        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      @org.apache.thrift.annotation.Nullable
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      @org.apache.thrift.annotation.Nullable
+      public static _Fields findByName(java.lang.String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final java.lang.String _fieldName;
+
+      _Fields(short thriftId, java.lang.String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public java.lang.String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(updateParagraphConfig_result.class, metaDataMap);
+    }
+
+    public updateParagraphConfig_result() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public updateParagraphConfig_result(updateParagraphConfig_result other) {
+    }
+
+    public updateParagraphConfig_result deepCopy() {
+      return new updateParagraphConfig_result(this);
+    }
+
+    @Override
+    public void clear() {
+    }
+
+    public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
+      switch (field) {
+      }
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public java.lang.Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new java.lang.IllegalArgumentException();
+      }
+
+      switch (field) {
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(java.lang.Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof updateParagraphConfig_result)
+        return this.equals((updateParagraphConfig_result)that);
+      return false;
+    }
+
+    public boolean equals(updateParagraphConfig_result that) {
+      if (that == null)
+        return false;
+      if (this == that)
+        return true;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int hashCode = 1;
+
+      return hashCode;
+    }
+
+    @Override
+    public int compareTo(updateParagraphConfig_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      return 0;
+    }
+
+    @org.apache.thrift.annotation.Nullable
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      scheme(iprot).read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      scheme(oprot).write(oprot, this);
+      }
+
+    @Override
+    public java.lang.String toString() {
+      java.lang.StringBuilder sb = new java.lang.StringBuilder("updateParagraphConfig_result(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class updateParagraphConfig_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public updateParagraphConfig_resultStandardScheme getScheme() {
+        return new updateParagraphConfig_resultStandardScheme();
+      }
+    }
+
+    private static class updateParagraphConfig_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<updateParagraphConfig_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, updateParagraphConfig_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, updateParagraphConfig_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class updateParagraphConfig_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public updateParagraphConfig_resultTupleScheme getScheme() {
+        return new updateParagraphConfig_resultTupleScheme();
+      }
+    }
+
+    private static class updateParagraphConfig_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme<updateParagraphConfig_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, updateParagraphConfig_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, updateParagraphConfig_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      }
+    }
+
+    private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+      return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    }
+  }
+
   public static class getAllResources_args implements org.apache.thrift.TBase<getAllResources_args, getAllResources_args._Fields>, java.io.Serializable, Cloneable, Comparable<getAllResources_args>   {
     private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getAllResources_args");
 
@@ -12902,13 +13936,13 @@ public class RemoteInterpreterEventService {
             case 0: // SUCCESS
               if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
                 {
-                  org.apache.thrift.protocol.TList _list24 = iprot.readListBegin();
-                  struct.success = new java.util.ArrayList<java.lang.String>(_list24.size);
-                  @org.apache.thrift.annotation.Nullable java.lang.String _elem25;
-                  for (int _i26 = 0; _i26 < _list24.size; ++_i26)
+                  org.apache.thrift.protocol.TList _list34 = iprot.readListBegin();
+                  struct.success = new java.util.ArrayList<java.lang.String>(_list34.size);
+                  @org.apache.thrift.annotation.Nullable java.lang.String _elem35;
+                  for (int _i36 = 0; _i36 < _list34.size; ++_i36)
                   {
-                    _elem25 = iprot.readString();
-                    struct.success.add(_elem25);
+                    _elem35 = iprot.readString();
+                    struct.success.add(_elem35);
                   }
                   iprot.readListEnd();
                 }
@@ -12936,9 +13970,9 @@ public class RemoteInterpreterEventService {
           oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
           {
             oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.success.size()));
-            for (java.lang.String _iter27 : struct.success)
+            for (java.lang.String _iter37 : struct.success)
             {
-              oprot.writeString(_iter27);
+              oprot.writeString(_iter37);
             }
             oprot.writeListEnd();
           }
@@ -12969,9 +14003,9 @@ public class RemoteInterpreterEventService {
         if (struct.isSetSuccess()) {
           {
             oprot.writeI32(struct.success.size());
-            for (java.lang.String _iter28 : struct.success)
+            for (java.lang.String _iter38 : struct.success)
             {
-              oprot.writeString(_iter28);
+              oprot.writeString(_iter38);
             }
           }
         }
@@ -12983,13 +14017,13 @@ public class RemoteInterpreterEventService {
         java.util.BitSet incoming = iprot.readBitSet(1);
         if (incoming.get(0)) {
           {
-            org.apache.thrift.protocol.TList _list29 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
-            struct.success = new java.util.ArrayList<java.lang.String>(_list29.size);
-            @org.apache.thrift.annotation.Nullable java.lang.String _elem30;
-            for (int _i31 = 0; _i31 < _list29.size; ++_i31)
+            org.apache.thrift.protocol.TList _list39 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+            struct.success = new java.util.ArrayList<java.lang.String>(_list39.size);
+            @org.apache.thrift.annotation.Nullable java.lang.String _elem40;
+            for (int _i41 = 0; _i41 < _list39.size; ++_i41)
             {
-              _elem30 = iprot.readString();
-              struct.success.add(_elem30);
+              _elem40 = iprot.readString();
+              struct.success.add(_elem40);
             }
           }
           struct.setSuccessIsSet(true);
@@ -15475,14 +16509,14 @@ public class RemoteInterpreterEventService {
             case 0: // SUCCESS
               if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
                 {
-                  org.apache.thrift.protocol.TList _list32 = iprot.readListBegin();
-                  struct.success = new java.util.ArrayList<ParagraphInfo>(_list32.size);
-                  @org.apache.thrift.annotation.Nullable ParagraphInfo _elem33;
-                  for (int _i34 = 0; _i34 < _list32.size; ++_i34)
+                  org.apache.thrift.protocol.TList _list42 = iprot.readListBegin();
+                  struct.success = new java.util.ArrayList<ParagraphInfo>(_list42.size);
+                  @org.apache.thrift.annotation.Nullable ParagraphInfo _elem43;
+                  for (int _i44 = 0; _i44 < _list42.size; ++_i44)
                   {
-                    _elem33 = new ParagraphInfo();
-                    _elem33.read(iprot);
-                    struct.success.add(_elem33);
+                    _elem43 = new ParagraphInfo();
+                    _elem43.read(iprot);
+                    struct.success.add(_elem43);
                   }
                   iprot.readListEnd();
                 }
@@ -15519,9 +16553,9 @@ public class RemoteInterpreterEventService {
           oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
           {
             oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.success.size()));
-            for (ParagraphInfo _iter35 : struct.success)
+            for (ParagraphInfo _iter45 : struct.success)
             {
-              _iter35.write(oprot);
+              _iter45.write(oprot);
             }
             oprot.writeListEnd();
           }
@@ -15560,9 +16594,9 @@ public class RemoteInterpreterEventService {
         if (struct.isSetSuccess()) {
           {
             oprot.writeI32(struct.success.size());
-            for (ParagraphInfo _iter36 : struct.success)
+            for (ParagraphInfo _iter46 : struct.success)
             {
-              _iter36.write(oprot);
+              _iter46.write(oprot);
             }
           }
         }
@@ -15577,14 +16611,14 @@ public class RemoteInterpreterEventService {
         java.util.BitSet incoming = iprot.readBitSet(2);
         if (incoming.get(0)) {
           {
-            org.apache.thrift.protocol.TList _list37 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
-            struct.success = new java.util.ArrayList<ParagraphInfo>(_list37.size);
-            @org.apache.thrift.annotation.Nullable ParagraphInfo _elem38;
-            for (int _i39 = 0; _i39 < _list37.size; ++_i39)
+            org.apache.thrift.protocol.TList _list47 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+            struct.success = new java.util.ArrayList<ParagraphInfo>(_list47.size);
+            @org.apache.thrift.annotation.Nullable ParagraphInfo _elem48;
+            for (int _i49 = 0; _i49 < _list47.size; ++_i49)
             {
-              _elem38 = new ParagraphInfo();
-              _elem38.read(iprot);
-              struct.success.add(_elem38);
+              _elem48 = new ParagraphInfo();
+              _elem48.read(iprot);
+              struct.success.add(_elem48);
             }
           }
           struct.setSuccessIsSet(true);
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
index 590598a..892bd06 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
   NO_OP(1),
   ANGULAR_OBJECT_ADD(2),
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
index 782fe48..fb8d25e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java
index 49e6ce2..e9fca08 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase<RemoteInterpreterResultMessage, RemoteInterpreterResultMessage._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResultMessage> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
index 48f57f3..9f80238 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class RemoteInterpreterService {
 
   public interface Iface {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
index a7622de..e424597 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class RunParagraphsEvent implements org.apache.thrift.TBase<RunParagraphsEvent, RunParagraphsEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RunParagraphsEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RunParagraphsEvent");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java
index 75f6098..90575c6 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class ServiceException extends org.apache.thrift.TException implements org.apache.thrift.TBase<ServiceException, ServiceException._Fields>, java.io.Serializable, Cloneable, Comparable<ServiceException> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ServiceException");
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/WebUrlInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/WebUrlInfo.java
index 9050e54..f5a9e24 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/WebUrlInfo.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/WebUrlInfo.java
@@ -24,7 +24,7 @@
 package org.apache.zeppelin.interpreter.thrift;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-26")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-28")
 public class WebUrlInfo implements org.apache.thrift.TBase<WebUrlInfo, WebUrlInfo._Fields>, java.io.Serializable, Cloneable, Comparable<WebUrlInfo> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("WebUrlInfo");
 
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift
index 611c62f..479ee94 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift
@@ -123,6 +123,7 @@ service RemoteInterpreterEventService {
 
   void sendWebUrl(1: WebUrlInfo weburlInfo);
   void sendParagraphInfo(1: string intpGroupId, 2: string json);
+  void updateParagraphConfig(1: string noteId, 2: string paragraphId, 3: map<string, string> config);
 
   list<string> getAllResources(1: string intpGroupId);
   binary getResource(1: string resourceIdJson);
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index a36a8d0..a1b27b5 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -335,7 +335,7 @@ public class NotebookService {
       p.settings.setParams(params);
     }
     if (config != null && !config.isEmpty()) {
-      p.setConfig(config);
+      p.mergeConfig(config);
     }
 
     if (note.isPersonalizedMode()) {
@@ -347,7 +347,7 @@ public class NotebookService {
         p.settings.setParams(params);
       }
       if (config != null && !config.isEmpty()) {
-        p.setConfig(config);
+        p.mergeConfig(config);
       }
     }
 
@@ -605,13 +605,13 @@ public class NotebookService {
     }
 
     p.settings.setParams(params);
-    p.setConfig(config);
+    p.mergeConfig(config);
     p.setTitle(title);
     p.setText(text);
     if (note.isPersonalizedMode()) {
       p = p.getUserParagraph(context.getAutheInfo().getUser());
       p.settings.setParams(params);
-      p.setConfig(config);
+      p.mergeConfig(config);
       p.setTitle(title);
       p.setText(text);
     }
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 2d4d861..9ebd83c 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -35,8 +35,6 @@ import org.apache.commons.httpclient.methods.PutMethod;
 import org.apache.commons.httpclient.methods.RequestEntity;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.plugin.PluginManager;
 import org.apache.zeppelin.utils.TestUtils;
@@ -259,12 +257,6 @@ public abstract class AbstractTestRestApi {
       }
 
       LOG.info("Zeppelin Server is started.");
-
-      // set up spark interpreter
-      String sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
-      InterpreterSettingManager interpreterSettingManager = TestUtils.getInstance(InterpreterSettingManager.class);
-      InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
-      interpreterSetting.setProperty("SPARK_HOME", sparkHome);
     }
   }
 
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
index 3813ce0..5f67889 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -41,7 +41,6 @@ import org.apache.zeppelin.interpreter.thrift.OutputUpdateAllEvent;
 import org.apache.zeppelin.interpreter.thrift.OutputUpdateEvent;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.interpreter.thrift.RunParagraphsEvent;
 import org.apache.zeppelin.interpreter.thrift.ServiceException;
 import org.apache.zeppelin.interpreter.thrift.WebUrlInfo;
@@ -51,6 +50,7 @@ import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourceId;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.resource.ResourceSet;
+import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +67,7 @@ import java.util.concurrent.TimeUnit;
 public class RemoteInterpreterEventServer implements RemoteInterpreterEventService.Iface {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterEventServer.class);
+  private static final Gson GSON = new Gson();
 
   private String portRange;
   private int port;
@@ -80,7 +81,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
   private AppendOutputRunner runner;
   private final RemoteInterpreterProcessListener listener;
   private final ApplicationEventListener appListener;
-  private final Gson gson = new Gson();
+
 
   public RemoteInterpreterEventServer(ZeppelinConfiguration zConf,
                                       InterpreterSettingManager interpreterSettingManager) {
@@ -261,7 +262,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
     InterpreterGroup interpreterGroup =
         interpreterSettingManager.getInterpreterGroupById(intpGroupId);
     if (interpreterGroup == null) {
-      throw new TException("Invalid InterpreterGroupId: " + intpGroupId);
+      LOGGER.warn("Invalid InterpreterGroupId: " + intpGroupId);
+      return;
     }
     interpreterGroup.getAngularObjectRegistry().add(angularObject.getName(),
         angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId());
@@ -333,7 +335,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
       throw new TException("Invalid InterpreterGroupId: " + intpGroupId);
     }
 
-    Map<String, String> paraInfos = gson.fromJson(json,
+    Map<String, String> paraInfos = GSON.fromJson(json,
         new TypeToken<Map<String, String>>() {
         }.getType());
     String noteId = paraInfos.get("noteId");
@@ -514,4 +516,17 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
     }
     return resourceSet;
   }
+
+  @Override
+  public void updateParagraphConfig(String noteId,
+                                    String paragraphId,
+                                    Map<String, String> config) throws TException {
+    try {
+      Note note = interpreterSettingManager.getNotebook().getNote(noteId);
+      note.getParagraph(paragraphId).updateConfig(config);
+      interpreterSettingManager.getNotebook().saveNote(note, AuthenticationInfo.ANONYMOUS);
+    } catch (Exception e) {
+      LOGGER.error("Fail to updateParagraphConfig", e);
+    }
+  }
 }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 8c899cf..7f323e0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import org.apache.thrift.TException;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
@@ -46,7 +45,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -56,7 +54,7 @@ import java.util.Properties;
  */
 public class RemoteInterpreter extends Interpreter {
   private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class);
-  private static final Gson gson = new Gson();
+  private static final Gson GSON = new Gson();
 
 
   private String className;
@@ -216,7 +214,7 @@ public class RemoteInterpreter extends Interpreter {
     return interpreterProcess.callRemoteFunction(client -> {
           RemoteInterpreterResult remoteResult = client.interpret(
               sessionId, className, st, convert(context));
-          Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
+          Map<String, Object> remoteConfig = (Map<String, Object>) GSON.fromJson(
               remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
               }.getType());
           context.getConfig().clear();
@@ -378,8 +376,8 @@ public class RemoteInterpreter extends Interpreter {
   private RemoteInterpreterContext convert(InterpreterContext ic) {
     return new RemoteInterpreterContext(ic.getNoteId(), ic.getNoteName(), ic.getParagraphId(),
         ic.getReplName(), ic.getParagraphTitle(), ic.getParagraphText(),
-        gson.toJson(ic.getAuthenticationInfo()), gson.toJson(ic.getConfig()), ic.getGui().toJson(),
-        gson.toJson(ic.getNoteGui()),
+        GSON.toJson(ic.getAuthenticationInfo()), GSON.toJson(ic.getConfig()), ic.getGui().toJson(),
+        GSON.toJson(ic.getNoteGui()),
         ic.getLocalProperties());
   }
 
@@ -410,7 +408,7 @@ public class RemoteInterpreter extends Interpreter {
       final java.lang.reflect.Type registryType = new TypeToken<Map<String,
           Map<String, AngularObject>>>() {
       }.getType();
-      client.angularRegistryPush(gson.toJson(registry, registryType));
+      client.angularRegistryPush(GSON.toJson(registry, registryType));
     }
   }
 
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index e8c02ac..ce80523 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -124,7 +124,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
     this.note = p2.note;
     this.settings.setParams(Maps.newHashMap(p2.settings.getParams()));
     this.settings.setForms(Maps.newLinkedHashMap(p2.settings.getForms()));
-    this.setConfig(Maps.newHashMap(p2.config));
+    this.setConfig(Maps.newHashMap(p2.getConfig()));
     this.setAuthenticationInfo(p2.getAuthenticationInfo());
     this.title = p2.title;
     this.text = p2.text;
@@ -553,7 +553,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   // NOTE: function setConfig(...) will overwrite all configuration
   // Merge configuration, you need to use function mergeConfig(...)
   public void setConfig(Map<String, Object> config) {
-    this.config = config;
+    this.config = Maps.newHashMap(config);
   }
 
   // [ZEPPELIN-3919] Paragraph config default value can be customized
@@ -571,6 +571,10 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
     this.config.putAll(newConfig);
   }
 
+  public void updateConfig(Map<String, String> newConfig) {
+    this.config.putAll(newConfig);
+  }
+
   public void setReturn(InterpreterResult value, Throwable t) {
     setResult(value);
     setException(t);
@@ -700,6 +704,14 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
     }
   }
 
+  @VisibleForTesting
+  public void waitUntilRunning() throws Exception {
+    while(!isRunning()) {
+      LOGGER.debug("Wait for paragraph to be running");
+      Thread.sleep(1000);
+    }
+  }
+
   private GUI getNoteGui() {
     GUI gui = new GUI();
     gui.setParams(this.note.getNoteParams());