You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/09/09 07:06:02 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4981]. Zeppelin Client API

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 0d9b4dd  [ZEPPELIN-4981]. Zeppelin Client API
0d9b4dd is described below

commit 0d9b4dd2d38c89fafce32b65f3d38f0974700fab
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Sep 8 15:18:28 2020 +0800

    [ZEPPELIN-4981]. Zeppelin Client API
    
    ### What is this PR for?
    
    This is the initial PR fo introducing zeppelin client api. You can check this google doc for the overall design. https://docs.google.com/document/d/1bLLKKxleZlZpP9EFJlLLkJKwDBps-RNvzNwh3LFZWZ4/edit?usp=sharing
    In this PR, I add 2 modules: zeppelin-client, zeppelin-client-examples.
    
    You can take a look at the module zeppelin-client-examples to see how we can use zeppelin client api.
    
    ### What type of PR is it?
    [ Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4981
    
    ### How should this be tested?
    * Manually tested and Unit test is added.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3887 from zjffdu/ZEPPELIN-4981 and squashes the following commits:
    
    9b8ff4244 [Jeff Zhang] update travis
    8eddf3e09 [Jeff Zhang] use ExceptionMapper
    bb0537129 [Jeff Zhang] update
    4205c1c5e [Jeff Zhang] code refactoring
    53fe1c13d [Jeff Zhang] update examples
    487cb8c05 [Jeff Zhang] minor update
    d7d11f77c [Jeff Zhang] minor update
    f5e1e2e76 [Jeff Zhang] add session reconnect
    bc723e766 [Jeff Zhang] minor update
    7fbd48147 [Jeff Zhang] minor update
    fe22986ff [Jeff Zhang] add sessionId to ZSession
    c7701765a [Jeff Zhang] add session status api
    1dbc414ba [Jeff Zhang] move classes to websocket folder
    0e43ce1f9 [Jeff Zhang] add more examples
    39d230b29 [Jeff Zhang] fix ut
    d4bbe9522 [Jeff Zhang] add zeppelin client examples
    5ca49953a [Jeff Zhang] update
    5611b7b0e [Jeff Zhang] resolve gson dependency issue
    1aa1d68da [Jeff Zhang] [ZEPPELIN-4981]. Zeppelin Client API (Zeppelin SDK)
    
    (cherry picked from commit f11e6f35bd4da1a73ef5d96b548216ee161c439a)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .travis.yml                                        |  21 +
 pom.xml                                            |   2 +
 spark/pom.xml                                      |   2 +
 zeppelin-client-examples/pom.xml                   |  83 +++
 .../client/examples/FlinkAdvancedExample.java      | 111 +++
 .../client/examples/FlinkAdvancedExample2.java     | 110 +++
 .../zeppelin/client/examples/FlinkExample.java     | 103 +++
 .../zeppelin/client/examples/HiveExample.java      |  78 +++
 .../zeppelin/client/examples/PrestoExample.java    |  79 +++
 .../zeppelin/client/examples/PythonExample.java    |  96 +++
 .../apache/zeppelin/client/examples/RExample.java  |  93 +++
 .../client/examples/SparkAdvancedExample.java      | 102 +++
 .../zeppelin/client/examples/SparkExample.java     | 112 +++
 .../client/examples/ZeppelinClientExample.java     |  71 ++
 .../client/examples/ZeppelinClientExample2.java    |  71 ++
 .../src/main/resources/init_stream.scala           |  63 ++
 .../src/main/resources/log4j.properties            |  22 +
 zeppelin-client/pom.xml                            | 113 ++++
 .../org/apache/zeppelin/client/ClientConfig.java   |  62 ++
 .../org/apache/zeppelin/client/ExecuteResult.java  |  79 +++
 .../org/apache/zeppelin/client/NoteResult.java     |  40 +-
 .../apache/zeppelin/client/ParagraphResult.java    | 122 ++++
 .../java/org/apache/zeppelin/client/Result.java    |  42 +-
 .../org/apache/zeppelin/client/SessionInfo.java    |  95 +++
 .../java/org/apache/zeppelin/client/Status.java    |  42 +-
 .../java/org/apache/zeppelin/client/ZSession.java  | 504 ++++++++++++++
 .../org/apache/zeppelin/client/ZeppelinClient.java | 747 +++++++++++++++++++++
 .../client/websocket/AbstractMessageHandler.java   |  87 +++
 .../client/websocket/CompositeMessageHandler.java  |  59 ++
 .../client/websocket/JsonSerializable.java         |  27 +-
 .../apache/zeppelin/client/websocket/Message.java  | 288 ++++++++
 .../zeppelin/client/websocket/MessageHandler.java  |  25 +-
 .../client/websocket/SimpleMessageHandler.java     |  30 +-
 .../client/websocket/StatementMessageHandler.java  |  45 +-
 .../client/websocket/ZeppelinWebSocketClient.java  | 122 ++++
 .../src/test/resources/log4j.properties            |   3 -
 zeppelin-interpreter-integration/pom.xml           |  12 +
 .../integration/ZSessionIntegrationTest.java       | 507 ++++++++++++++
 .../integration/ZeppelinClientIntegrationTest.java | 378 +++++++++++
 .../ZeppelinClientWithAuthIntegrationTest.java     | 107 +++
 .../src/test/resources/log4j.properties            |   4 +-
 .../zeppelin/interpreter/ExecutionContext.java     |  13 +-
 .../interpreter/ExecutionContextBuilder.java       |   9 +-
 .../apache/zeppelin/rest/InterpreterRestApi.java   |  30 +-
 .../org/apache/zeppelin/rest/NotebookRestApi.java  | 189 +++---
 .../org/apache/zeppelin/rest/SessionManager.java   | 175 +++++
 .../org/apache/zeppelin/rest/SessionRestApi.java   | 106 +++
 .../exception/WebApplicationExceptionMapper.java   |  12 +-
 .../zeppelin/rest/message/NewNoteRequest.java      |  16 +-
 .../rest/message/RestartInterpreterRequest.java    |   8 +-
 .../apache/zeppelin/rest/message/SessionInfo.java  |  86 +++
 .../rest/message/gson/ExceptionSerializer.java     |   3 +
 .../apache/zeppelin/service/NotebookService.java   |  55 +-
 .../org/apache/zeppelin/socket/NotebookServer.java |   4 +-
 .../zeppelin/socket/NotebookWebSocketCreator.java  |   1 +
 .../apache/zeppelin/rest/AbstractTestRestApi.java  |   1 +
 .../apache/zeppelin/rest/NotebookRestApiTest.java  |  21 +-
 .../zeppelin/rest/NotebookSecurityRestApiTest.java |   8 +-
 .../apache/zeppelin/rest/ZeppelinRestApiTest.java  |   2 +-
 .../zeppelin/service/NotebookServiceTest.java      |  20 +-
 .../zeppelin/interpreter/InterpreterSetting.java   |   8 +
 .../interpreter/RemoteInterpreterEventServer.java  |   3 +-
 .../remote/RemoteInterpreterProcess.java           |  10 +
 .../java/org/apache/zeppelin/notebook/Note.java    |  14 +-
 .../org/apache/zeppelin/notebook/Paragraph.java    |  27 +-
 65 files changed, 5273 insertions(+), 307 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index faf1b0c..7c50f9b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -215,6 +215,27 @@ jobs:
         - mvn test -DskipRat -pl $(echo .,zeppelin-interpreter,zeppelin-interpreter-shaded,${INTERPRETERS} | sed 's/!//g') -Pscala-2.10 -B
 
 
+    - name: "Test zeppelin-client integration test"
+      jdk: "openjdk8"
+      dist: xenial
+      before_install:
+        - export PYTHON=3
+        - export R=true
+        - echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxMetaspaceSize=1024m -XX:-UseGCOverheadLimit -Dorg.slf4j.simpleLogger.defaultLogLevel=warn'" >> ~/.mavenrc
+        - bash -x ./testing/install_external_dependencies.sh
+        - source ~/.environ
+      install:
+        - mvn install -DskipTests -DskipRat -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown,flink/interpreter,jdbc,shell -am
+        - mvn clean package -T 2C -pl zeppelin-plugins -amd -B
+      before_script:
+        - echo "export ZEPPELIN_HELIUM_REGISTRY=helium" >> conf/zeppelin-env.sh
+        - echo "export SPARK_PRINT_LAUNCH_COMMAND=true" >> conf/zeppelin-env.sh
+        - export SPARK_PRINT_LAUNCH_COMMAND=true
+        - tail conf/zeppelin-env.sh
+      script:
+        - mvn test -DskipRat -pl zeppelin-interpreter-integration -Pintegration -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest
+
+
     - name: "Test flink 1.10 & flink integration test"
       jdk: "openjdk8"
       dist: xenial
diff --git a/pom.xml b/pom.xml
index 47e6e02..78f66de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,8 @@
     <module>geode</module>
     <module>ksql</module>
     <module>sparql</module>
+    <module>zeppelin-client</module>
+    <module>zeppelin-client-examples</module>
     <module>zeppelin-web</module>
     <module>zeppelin-server</module>
     <module>zeppelin-jupyter</module>
diff --git a/spark/pom.xml b/spark/pom.xml
index 25301b2..022c3cd 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -42,6 +42,8 @@
 
         <!-- spark versions -->
         <spark.version>2.4.5</spark.version>
+        <protobuf.version>2.5.0</protobuf.version>
+        <py4j.version>0.10.7</py4j.version>
         <spark.scala.version>2.11.12</spark.scala.version>
         <spark.scala.binary.version>2.11</spark.scala.binary.version>
 
diff --git a/zeppelin-client-examples/pom.xml b/zeppelin-client-examples/pom.xml
new file mode 100644
index 0000000..8817635
--- /dev/null
+++ b/zeppelin-client-examples/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zeppelin</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>zeppelin-client-examples</artifactId>
+  <packaging>jar</packaging>
+  <version>0.9.0-SNAPSHOT</version>
+  <name>Zeppelin: Client Examples</name>
+  <description>Zeppelin Client Examples</description>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample.java
new file mode 100644
index 0000000..64b6184
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Advanced example of run flink streaming sql via session api.
+ * You can capture the streaming output via SimpleMessageHandler
+ */
+public class FlinkAdvancedExample {
+  public static void main(String[] args) {
+
+    ZSession session = null;
+    try {
+      ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+      Map<String, String> intpProperties = new HashMap<>();
+
+      session = ZSession.builder()
+              .setClientConfig(clientConfig)
+              .setInterpreter("flink")
+              .setIntpProperties(intpProperties)
+              .build();
+
+      // if MessageHandler is specified, then websocket is enabled.
+      // you can get continuous output from Zeppelin via websocket.
+      session.start(new SimpleMessageHandler());
+      System.out.println("Flink Web UI: " + session.getWeburl());
+
+      String code = "benv.fromElements(1,2,3,4,5,6,7,8,9,10).map(e=> {Thread.sleep(1000); e}).print()";
+      System.out.println("Submit code: " + code);
+      // use submit to run flink code in non-blocking way.
+      ExecuteResult result = session.submit(code);
+      System.out.println("Job status: " + result.getStatus());
+      while(!result.getStatus().isCompleted()) {
+        result = session.queryStatement(result.getStatementId());
+        System.out.println("Job status: " + result.getStatus() + ", progress: " + result.getProgress());
+        Thread.sleep(1000);
+      }
+      System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+      System.out.println("-----------------------------------------------------------------------------");
+      System.out.println("Submit code: " + code);
+      result = session.submit("benv.fromElements(1,2,3,4,5,6,7,8,9,10).map(e=> {Thread.sleep(1000); e}).print()");
+      System.out.println("Job status: " + result.getStatus());
+      result = session.waitUntilFinished(result.getStatementId());
+      System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+      System.out.println("-----------------------------------------------------------------------------");
+      code = "for(i <- 1 to 10) {\n" +
+              "   Thread.sleep(1000)\n" +
+              "   println(i)\n" +
+              "}";
+      System.out.println("Submit code: " + code);
+      result = session.execute(code);
+      System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+      System.out.println("-----------------------------------------------------------------------------");
+      String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
+      result = session.execute(initCode);
+      System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+      // run flink ssql
+      Map<String, String> localProperties = new HashMap<>();
+      localProperties.put("type", "update");
+      result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url");
+      session.waitUntilFinished(result.getStatementId());
+
+      result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url");
+      session.waitUntilRunning(result.getStatementId());
+      Thread.sleep(10 * 1000);
+      System.out.println("Try to cancel statement: " + result.getStatementId());
+      session.cancel(result.getStatementId());
+      session.waitUntilFinished(result.getStatementId());
+      System.out.println("Job status: " + result.getStatus());
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (session != null) {
+        try {
+          session.stop();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample2.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample2.java
new file mode 100644
index 0000000..787bd13
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample2.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.websocket.CompositeMessageHandler;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.StatementMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Advanced example of run flink streaming sql via session api.
+ * You can capture the streaming output via CompositeMessageHandler.
+ * You can specify StatementMessageHandler(MyStatementMessageHandler1, MyStatementMessageHandler2)
+ * for each flink job.
+ */
+public class FlinkAdvancedExample2 {
+  public static void main(String[] args) {
+
+    ZSession session = null;
+    try {
+      ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+      Map<String, String> intpProperties = new HashMap<>();
+
+      session = ZSession.builder()
+              .setClientConfig(clientConfig)
+              .setInterpreter("flink")
+              .setIntpProperties(intpProperties)
+              .build();
+
+      // CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
+      // otherwise you have to use a global MessageHandler.
+      session.start(new CompositeMessageHandler());
+      System.out.println("Flink Web UI: " + session.getWeburl());
+
+      System.out.println("-----------------------------------------------------------------------------");
+      String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
+      ExecuteResult result = session.execute(initCode);
+      System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+      // run flink ssql
+      Map<String, String> localProperties = new HashMap<>();
+      localProperties.put("type", "update");
+      result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
+              new MyStatementMessageHandler1());
+      session.waitUntilFinished(result.getStatementId());
+
+      result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
+              new MyStatementMessageHandler2());
+      session.waitUntilFinished(result.getStatementId());
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (session != null) {
+        try {
+          session.stop();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+
+  public static class MyStatementMessageHandler1 implements StatementMessageHandler {
+
+    @Override
+    public void onStatementAppendOutput(String statementId, int index, String output) {
+      System.out.println("MyStatementMessageHandler1, append output: " + output);
+    }
+
+    @Override
+    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
+      System.out.println("MyStatementMessageHandler1, update output: " + output);
+    }
+  }
+
+  public static class MyStatementMessageHandler2 implements StatementMessageHandler {
+
+    @Override
+    public void onStatementAppendOutput(String statementId, int index, String output) {
+      System.out.println("MyStatementMessageHandler2, append output: " + output);
+    }
+
+    @Override
+    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
+      System.out.println("MyStatementMessageHandler2, update output: " + output);
+    }
+  }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkExample.java
new file mode 100644
index 0000000..562156f
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkExample.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Basic example of run flink code (scala, sql, python) via session api.
+ */
+public class FlinkExample {
+  public static void main(String[] args) {
+
+    ZSession session = null;
+    try {
+      ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+      Map<String, String> intpProperties = new HashMap<>();
+
+      session = ZSession.builder()
+              .setClientConfig(clientConfig)
+              .setInterpreter("flink")
+              .setIntpProperties(intpProperties)
+              .build();
+
+      session.start();
+      System.out.println("Flink Web UI: " + session.getWeburl());
+
+      // scala (single result)
+      ExecuteResult result = session.execute("benv.fromElements(1,2,3).print()");
+      System.out.println("Result: " + result.getResults().get(0).getData());
+
+      // scala (multiple result)
+      result = session.execute("val data = benv.fromElements(1,2,3).map(e=>(e, e * 2))\n" +
+              "data.print()\n" +
+              "z.show(data)");
+
+      // The first result is text output
+      System.out.println("Result 1: type: " + result.getResults().get(0).getType() +
+              ", data: " + result.getResults().get(0).getData() );
+      // The second result is table output
+      System.out.println("Result 2: type: " + result.getResults().get(1).getType() +
+              ", data: " + result.getResults().get(1).getData() );
+      System.out.println("Flink Job Urls:\n" + StringUtils.join(result.getJobUrls(), "\n"));
+
+      // error output
+      result = session.execute("1/0");
+      System.out.println("Result status: " + result.getStatus() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // pyflink
+      result = session.execute("pyflink", "type(b_env)");
+      System.out.println("benv: " + result.getResults().get(0).getData());
+      // matplotlib
+      result = session.execute("ipyflink", "%matplotlib inline\n" +
+              "import matplotlib.pyplot as plt\n" +
+              "plt.plot([1,2,3,4])\n" +
+              "plt.ylabel('some numbers')\n" +
+              "plt.show()");
+      System.out.println("Matplotlib result, type: " + result.getResults().get(0).getType() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // flink sql
+      result = session.execute("ssql", "show tables");
+      System.out.println("Flink tables: " + result.getResults().get(0).getData());
+
+      // flink invalid sql
+      result = session.execute("bsql", "select * from unknown_table");
+      System.out.println("Result status: " + result.getStatus() +
+              ", data: " + result.getResults().get(0).getData());
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (session != null) {
+        try {
+          session.stop();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/HiveExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/HiveExample.java
new file mode 100644
index 0000000..1e14a86
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/HiveExample.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Basic example of run hive sql via session api.
+ * And you can capture the job progress info via SimpleMessageHandler.
+ */
+public class HiveExample {
+
+  public static void main(String[] args) {
+
+    ZSession session = null;
+    try {
+      ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+      Map<String, String> intpProperties = new HashMap<>();
+
+      session = ZSession.builder()
+              .setClientConfig(clientConfig)
+              .setInterpreter("hive")
+              .setIntpProperties(intpProperties)
+              .build();
+
+      session.start(new SimpleMessageHandler());
+
+      // single sql
+      ExecuteResult result = session.execute("show databases");
+      System.out.println("show database result : " + result.getResults().get(0).getData());
+
+      // multiple sql
+      result = session.execute("use tpch_text_5;\nshow tables");
+      System.out.println("show tables result: " + result.getResults().get(0).getData());
+
+      // select, you can see the hive sql job progress via SimpleMessageHandler
+      result = session.execute("select count(1) from lineitem");
+      System.out.println("Result status: " + result.getStatus() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // invalid sql
+      result = session.execute("select * from unknown_table");
+      System.out.println("Result status: " + result.getStatus() +
+              ", data: " + result.getResults().get(0).getData());
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (session != null) {
+        try {
+          session.stop();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PrestoExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PrestoExample.java
new file mode 100644
index 0000000..5373b00
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PrestoExample.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Basic example of run presto sql via session api.
+ */
+public class PrestoExample {
+
+  public static void main(String[] args) {
+
+    ZSession session = null;
+    try {
+      ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+      Map<String, String> intpProperties = new HashMap<>();
+
+      session = ZSession.builder()
+              .setClientConfig(clientConfig)
+              .setInterpreter("presto")
+              .setIntpProperties(intpProperties)
+              .build();
+
+      session.start(new SimpleMessageHandler());
+
+      // single sql
+      ExecuteResult result = session.execute("show schemas");
+      System.out.println("show schemas result : " + result.getResults().get(0).getData());
+
+      // multiple sql
+      result = session.execute("use tpch_text_5;\nshow tables");
+      System.out.println("show tables result: " + result.getResults().get(0).getData());
+
+      // select
+      result = session.execute("select count(1) from lineitem");
+      System.out.println("Result status: " + result.getStatus() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // invalid sql
+      result = session.execute("select * from unknown_table");
+      System.out.println("Result status: " + result.getStatus() +
+              ", data: " + result.getResults().get(0).getData());
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (session != null) {
+        try {
+          session.stop();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PythonExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PythonExample.java
new file mode 100644
index 0000000..6a17ffb
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PythonExample.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Basic example of run python code via session api.
+ */
+public class PythonExample {
+
+  public static void main(String[] args) {
+
+    ZSession session = null;
+    try {
+      ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+      Map<String, String> intpProperties = new HashMap<>();
+
+      session = ZSession.builder()
+              .setClientConfig(clientConfig)
+              .setInterpreter("python")
+              .setIntpProperties(intpProperties)
+              .build();
+
+      session.start(new SimpleMessageHandler());
+
+      // single statement
+      ExecuteResult result = session.execute("print('hello world')");
+      System.out.println(result.getResults().get(0).getData());
+
+      // multiple statement
+      result = session.execute("print('hello world')\nprint('hello world2')");
+      System.out.println(result.getResults().get(0).getData());
+
+      // error output
+      result = session.execute("1/0");
+      System.out.println("Result status: " + result.getStatus() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // matplotlib
+      result = session.execute("ipython", "%matplotlib inline\n" +
+              "import matplotlib.pyplot as plt\n" +
+              "plt.plot([1,2,3,4])\n" +
+              "plt.ylabel('some numbers')\n" +
+              "plt.show()");
+      System.out.println("Matplotlib result, type: " + result.getResults().get(0).getType() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // show pandas dataframe
+      result = session.execute("ipython", "import pandas as pd\n" +
+              "df = pd.DataFrame({'name':['a','b','c'], 'count':[12,24,18]})\n" +
+              "z.show(df)");
+      System.out.println("Pandas dataframe result, type: " + result.getResults().get(0).getType() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // streaming output
+      result = session.execute("import time\n" +
+              "for i in range(1,10):\n" +
+              "    print(i)\n" +
+              "    time.sleep(1)");
+      System.out.println("Python streaming result, type: " + result.getResults().get(0).getType() +
+              ", data: " + result.getResults().get(0).getData());
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (session != null) {
+        try {
+          session.stop();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/RExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/RExample.java
new file mode 100644
index 0000000..1407374
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/RExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Basic example of run r code via session api.
+ */
+public class RExample {
+
+  public static void main(String[] args) {
+
+    ZSession session = null;
+    try {
+      ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+      Map<String, String> intpProperties = new HashMap<>();
+
+      session = ZSession.builder()
+              .setClientConfig(clientConfig)
+              .setInterpreter("r")
+              .setIntpProperties(intpProperties)
+              .build();
+
+      session.start(new SimpleMessageHandler());
+
+      // single statement
+      ExecuteResult result = session.execute("bare <- c(1, 2.5, 4)\n" +
+              "print(bare)");
+      System.out.println(result.getResults().get(0).getData());
+
+      // error output
+      result = session.execute("1/0");
+      System.out.println("Result status: " + result.getStatus() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // R plotting
+      result = session.execute("ir", "pairs(iris)");
+      System.out.println("R plotting result, type: " + result.getResults().get(0).getType() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // ggplot2
+      result = session.execute("ir", "library(ggplot2)\n" +
+              "ggplot(mpg, aes(displ, hwy, colour = class)) + \n" +
+              "  geom_point()");
+      System.out.println("ggplot2 plotting result, type: " + result.getResults().get(0).getType() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // googlevis
+      result = session.execute("ir", "library(googleVis)\n" +
+              "df=data.frame(country=c(\"US\", \"GB\", \"BR\"), \n" +
+              "              val1=c(10,13,14), \n" +
+              "              val2=c(23,12,32))\n" +
+              "Bar <- gvisBarChart(df)\n" +
+              "print(Bar, tag = 'chart')");
+      System.out.println("googlevis plotting result, type: " + result.getResults().get(0).getType() +
+              ", data: " + result.getResults().get(0).getData());
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (session != null) {
+        try {
+          session.stop();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkAdvancedExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkAdvancedExample.java
new file mode 100644
index 0000000..ec0933f
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkAdvancedExample.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Advanced example of run spark code via session api.
+ */
+public class SparkAdvancedExample {
+
+  public static void main(String[] args) {
+
+    ZSession session = null;
+    try {
+      ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+      Map<String, String> intpProperties = new HashMap<>();
+      intpProperties.put("spark.master", "local[*]");
+
+      session = ZSession.builder()
+              .setClientConfig(clientConfig)
+              .setInterpreter("spark")
+              .setIntpProperties(intpProperties)
+              .build();
+
+      // if MessageHandler is specified, then websocket is enabled.
+      // you can get continuous output from Zeppelin via websocket.
+      session.start(new SimpleMessageHandler());
+      System.out.println("Spark Web UI: " + session.getWeburl());
+
+      String code = "sc.range(1,10).map(e=> {Thread.sleep(2000); e}).sum()";
+      System.out.println("Submit code: " + code);
+      // use submit to run spark code in non-blocking way.
+      ExecuteResult result = session.submit(code);
+      System.out.println("Job status: " + result.getStatus());
+      while(!result.getStatus().isCompleted()) {
+        result = session.queryStatement(result.getStatementId());
+        System.out.println("Job status: " + result.getStatus() + ", progress: " + result.getProgress());
+        Thread.sleep(1000);
+      }
+      System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+      System.out.println("-----------------------------------------------------------------------------");
+      System.out.println("Submit code: " + code);
+      result = session.submit("sc.range(1,10).map(e=> {Thread.sleep(2000); e}).sum()");
+      System.out.println("Job status: " + result.getStatus());
+      result = session.waitUntilFinished(result.getStatementId());
+      System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+      System.out.println("-----------------------------------------------------------------------------");
+      System.out.println("Submit code: " + code);
+      result = session.submit("sc.range(1,10).map(e=> {Thread.sleep(2000); e}).sum()");
+      System.out.println("Job status: " + result.getStatus());
+      session.waitUntilRunning(result.getStatementId());
+      System.out.println("Try to cancel statement: " + result.getStatementId());
+      session.cancel(result.getStatementId());
+      result = session.waitUntilFinished(result.getStatementId());
+      System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+      System.out.println("-----------------------------------------------------------------------------");
+      code = "for(i <- 1 to 10) {\n" +
+              "   Thread.sleep(1000)\n" +
+              "   println(i)\n" +
+              "}";
+      System.out.println("Submit code: " + code);
+      result = session.execute(code);
+      System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (session != null) {
+        try {
+          session.stop();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkExample.java
new file mode 100644
index 0000000..001c603
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkExample.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Basic example of run spark code (scala, sql, python, r) via session api.
+ */
+public class SparkExample {
+
+  public static void main(String[] args) {
+
+    ZSession session = null;
+    try {
+      ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+      Map<String, String> intpProperties = new HashMap<>();
+      intpProperties.put("spark.master", "local[*]");
+
+      session = ZSession.builder()
+              .setClientConfig(clientConfig)
+              .setInterpreter("spark")
+              .setIntpProperties(intpProperties)
+              .build();
+
+      session.start();
+      System.out.println("Spark Web UI: " + session.getWeburl());
+
+      // scala (single result)
+      ExecuteResult result = session.execute("println(sc.version)");
+      System.out.println("Spark Version: " + result.getResults().get(0).getData());
+
+      // scala (multiple result)
+      result = session.execute("println(sc.version)\n" +
+              "val df = spark.createDataFrame(Seq((1,\"a\"), (2,\"b\")))\n" +
+              "z.show(df)");
+
+      // The first result is text output
+      System.out.println("Result 1: type: " + result.getResults().get(0).getType() +
+              ", data: " + result.getResults().get(0).getData() );
+      // The second result is table output
+      System.out.println("Result 2: type: " + result.getResults().get(1).getType() +
+              ", data: " + result.getResults().get(1).getData() );
+      System.out.println("Spark Job Urls:\n" + StringUtils.join(result.getJobUrls(), "\n"));
+
+      // error output
+      result = session.execute("1/0");
+      System.out.println("Result status: " + result.getStatus() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // pyspark
+      result = session.execute("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\n" +
+              "df.registerTempTable('df')\n" +
+              "df.show()");
+      System.out.println("PySpark dataframe: " + result.getResults().get(0).getData());
+
+      // matplotlib
+      result = session.execute("ipyspark", "%matplotlib inline\n" +
+              "import matplotlib.pyplot as plt\n" +
+              "plt.plot([1,2,3,4])\n" +
+              "plt.ylabel('some numbers')\n" +
+              "plt.show()");
+      System.out.println("Matplotlib result, type: " + result.getResults().get(0).getType() +
+              ", data: " + result.getResults().get(0).getData());
+
+      // sparkr
+      result = session.execute("r", "df <- as.DataFrame(faithful)\nhead(df)");
+      System.out.println("Sparkr dataframe: " + result.getResults().get(0).getData());
+
+      // spark sql
+      result = session.execute("sql", "select * from df");
+      System.out.println("Spark Sql dataframe: " + result.getResults().get(0).getData());
+
+      // spark invalid sql
+      result = session.execute("sql", "select * from unknown_table");
+      System.out.println("Result status: " + result.getStatus() +
+              ", data: " + result.getResults().get(0).getData());
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (session != null) {
+        try {
+          session.stop();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample.java
new file mode 100644
index 0000000..6dbae58
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.NoteResult;
+import org.apache.zeppelin.client.ParagraphResult;
+import org.apache.zeppelin.client.ZeppelinClient;
+
+
+/**
+ * Basic example of running zeppelin note/paragraph via ZeppelinClient (low level api)
+ */
+public class ZeppelinClientExample {
+
+  public static void main(String[] args) throws Exception {
+    ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+    ZeppelinClient zClient = new ZeppelinClient(clientConfig);
+
+    String zeppelinVersion = zClient.getVersion();
+    System.out.println("Zeppelin version: " + zeppelinVersion);
+
+    String notePath = "/zeppelin_client_examples/note_1";
+    String noteId = null;
+    try {
+      noteId = zClient.createNote(notePath);
+      System.out.println("Created note: " + noteId);
+
+      String paragraphId = zClient.addParagraph(noteId, "the first paragraph", "%python print('hello world')");
+      ParagraphResult paragraphResult = zClient.executeParagraph(noteId, paragraphId);
+      System.out.println("Added new paragraph and execute it.");
+      System.out.println("Paragraph result: " + paragraphResult);
+
+      String paragraphId2 = zClient.addParagraph(noteId, "the second paragraph",
+              "%python\nimport time\ntime.sleep(5)\nprint('done')");
+      zClient.submitParagraph(noteId, paragraphId2);
+      zClient.waitUtilParagraphRunning(noteId, paragraphId2);
+      zClient.cancelParagraph(noteId, paragraphId2);
+      paragraphResult = zClient.waitUtilParagraphFinish(noteId, paragraphId2);
+      System.out.println("Added new paragraph, submit it then cancel it");
+      System.out.println("Paragraph result: " + paragraphResult);
+
+      NoteResult noteResult = zClient.executeNote(noteId);
+      System.out.println("Execute note and the note result: " + noteResult);
+
+      zClient.submitNote(noteId);
+      noteResult = zClient.waitUntilNoteFinished(noteId);
+      System.out.println("Submit note and the note result: " + noteResult);
+    } finally {
+      if (noteId != null) {
+        zClient.deleteNote(noteId);
+        System.out.println("Note " + noteId + " is deleted");
+      }
+    }
+  }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample2.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample2.java
new file mode 100644
index 0000000..05e183c
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample2.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.NoteResult;
+import org.apache.zeppelin.client.ParagraphResult;
+import org.apache.zeppelin.client.ZeppelinClient;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Basic example of running existing note via ZeppelinClient (low level api)
+ *
+ */
+public class ZeppelinClientExample2 {
+
+  public static void main(String[] args) throws Exception {
+    ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+    ZeppelinClient zClient = new ZeppelinClient(clientConfig);
+
+    String zeppelinVersion = zClient.getVersion();
+    System.out.println("Zeppelin version: " + zeppelinVersion);
+
+    // execute note 2A94M5J1Z paragraph by paragraph
+    try {
+      ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
+      System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);
+
+      paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
+      System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);
+
+      Map<String, String> parameters = new HashMap<>();
+      parameters.put("maxAge", "40");
+      paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
+      System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);
+
+      parameters = new HashMap<>();
+      parameters.put("marital", "married");
+      paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
+      System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);
+    } finally {
+      // you need to stop interpreter explicitly if you are running paragraph separately.
+      zClient.stopInterpreter("2A94M5J1Z", "spark");
+    }
+
+    // execute this whole note, this note will run under a didicated interpreter process which will be
+    // stopped after note execution.
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("maxAge", "40");
+    parameters.put("marital", "married");
+    NoteResult noteResult = zClient.executeNote("2A94M5J1Z", parameters);
+    System.out.println("Execute the spark tutorial note, note result: " + noteResult);
+  }
+}
diff --git a/zeppelin-client-examples/src/main/resources/init_stream.scala b/zeppelin-client-examples/src/main/resources/init_stream.scala
new file mode 100644
index 0000000..c81e18a
--- /dev/null
+++ b/zeppelin-client-examples/src/main/resources/init_stream.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
+import java.util.Collections
+import scala.collection.JavaConversions._
+
+senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+senv.enableCheckpointing(5000)
+
+val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] {
+
+  val pages = Seq("home", "search", "search", "product", "product", "product")
+  var count: Long = 0
+  var running : Boolean = true
+  // startTime is 2018/1/1
+  var startTime: Long = new java.util.Date(2018 - 1900,0,1).getTime
+  var sleepInterval = 500
+
+  override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = {
+    val lock = ctx.getCheckpointLock
+
+    while (count < 60 && running) {
+      lock.synchronized({
+        ctx.collect((startTime + count * sleepInterval, pages(count.toInt % pages.size)))
+        count += 1
+        Thread.sleep(sleepInterval)
+      })
+    }
+  }
+
+  override def cancel(): Unit = {
+    running = false
+  }
+
+  override def snapshotState(checkpointId: Long, timestamp: Long): java.util.List[java.lang.Long] = {
+    Collections.singletonList(count)
+  }
+
+  override def restoreState(state: java.util.List[java.lang.Long]): Unit = {
+    state.foreach(s => count = s)
+  }
+
+}).assignAscendingTimestamps(_._1)
+
+stenv.registerDataStream("log", data, 'time, 'url, 'rowtime.rowtime)
diff --git a/zeppelin-client-examples/src/main/resources/log4j.properties b/zeppelin-client-examples/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8daee59
--- /dev/null
+++ b/zeppelin-client-examples/src/main/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger = INFO, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
diff --git a/zeppelin-client/pom.xml b/zeppelin-client/pom.xml
new file mode 100644
index 0000000..5fab859
--- /dev/null
+++ b/zeppelin-client/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zeppelin</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>zeppelin-client</artifactId>
+  <packaging>jar</packaging>
+  <version>0.9.0-SNAPSHOT</version>
+  <name>Zeppelin: Client</name>
+  <description>Zeppelin Client</description>
+
+  <properties>
+    <unirest.version>3.7.04</unirest.version>
+    <commons-text.version>1.8</commons-text.version>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.eclipse.jetty.websocket</groupId>
+      <artifactId>websocket-client</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.konghq</groupId>
+      <artifactId>unirest-java</artifactId>
+      <version>${unirest.version}</version>
+      <classifier>standalone</classifier>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty.websocket</groupId>
+      <artifactId>websocket-client</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-text</artifactId>
+      <version>${commons-text.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ClientConfig.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ClientConfig.java
new file mode 100644
index 0000000..527e033
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ClientConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client;
+
+/**
+ * Configuration of Zeppelin client, such as zeppelin server rest url and
+ * query interval of polling note/paragraph result.
+ */
+public class ClientConfig {
+  private String zeppelinRestUrl;
+  private long queryInterval ;
+  private boolean useKnox = false;
+
+  public ClientConfig(String zeppelinRestUrl) {
+    this(zeppelinRestUrl, 1000);
+  }
+
+  public ClientConfig(String zeppelinRestUrl, long queryInterval) {
+    this(zeppelinRestUrl, queryInterval, false);
+  }
+
+  public ClientConfig(String zeppelinRestUrl, long queryInterval, boolean useKnox) {
+    this.zeppelinRestUrl = removeTrailingSlash(zeppelinRestUrl);
+    this.queryInterval = queryInterval;
+    this.useKnox = useKnox;
+  }
+
+  private String removeTrailingSlash(String zeppelinRestUrl) {
+    if (zeppelinRestUrl.endsWith("/")) {
+      return zeppelinRestUrl.substring(0, zeppelinRestUrl.length() - 1);
+    } else {
+      return zeppelinRestUrl;
+    }
+  }
+
+  public String getZeppelinRestUrl() {
+    return zeppelinRestUrl;
+  }
+
+  public long getQueryInterval() {
+    return queryInterval;
+  }
+
+  public boolean isUseKnox() {
+    return useKnox;
+  }
+}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ExecuteResult.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ExecuteResult.java
new file mode 100644
index 0000000..c71bf06
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ExecuteResult.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+/**
+ * Execution result of each statement.
+ *
+ */
+public class ExecuteResult {
+
+  private String statementId;
+  private Status status;
+  // each statement may return multiple results
+  private List<Result> results;
+  // if there's any job in the statement, then it will also contain job urls.
+  // e.g. spark job url
+  private List<String> jobUrls;
+  // if there's any job in the statement, then it will also contain a progress
+  // range from 0 to 100. e.g. spark job progress.
+  private int progress;
+
+  public ExecuteResult(ParagraphResult paragraphResult) {
+    this.statementId = paragraphResult.getParagraphId();
+    this.status = paragraphResult.getStatus();
+    this.progress = paragraphResult.getProgress();
+    this.results = paragraphResult.getResults();
+    this.jobUrls = paragraphResult.getJobUrls();
+  }
+
+  public String getStatementId() {
+    return statementId;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+  public List<Result> getResults() {
+    return results;
+  }
+
+  public List<String> getJobUrls() {
+    return jobUrls;
+  }
+
+  public int getProgress() {
+    return progress;
+  }
+
+  @Override
+  public String toString() {
+    return "ExecuteResult{" +
+            "status=" + status +
+            ", progress=" + progress +
+            ", results=" + StringUtils.join(results, ", ") +
+            ", jobUrls=" + jobUrls +
+            '}';
+  }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/NoteResult.java
similarity index 52%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/NoteResult.java
index 8a6d0d0..b0cde11 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/NoteResult.java
@@ -14,32 +14,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.rest.message;
 
-import com.google.gson.Gson;
 
-import org.apache.zeppelin.common.JsonSerializable;
+package org.apache.zeppelin.client;
+
+import java.util.List;
 
 /**
- * RestartInterpreter rest api request message.
+ * Represent the note execution result.
  */
-public class RestartInterpreterRequest implements JsonSerializable {
-  private static final Gson gson = new Gson();
-
-  String noteId;
-
-  public RestartInterpreterRequest() {
+public class NoteResult {
+  private String noteId;
+  private boolean isRunning;
+  private List<ParagraphResult> paragraphResultList;
+
+  public NoteResult(String noteId, boolean isRunning, List<ParagraphResult> paragraphResultList) {
+    this.noteId = noteId;
+    this.isRunning = isRunning;
+    this.paragraphResultList = paragraphResultList;
   }
 
   public String getNoteId() {
     return noteId;
   }
 
-  public String toJson() {
-    return gson.toJson(this);
+  public boolean isRunning() {
+    return isRunning;
+  }
+
+  public List<ParagraphResult> getParagraphResultList() {
+    return paragraphResultList;
   }
 
-  public static RestartInterpreterRequest fromJson(String json) {
-    return gson.fromJson(json, RestartInterpreterRequest.class);
+  @Override
+  public String toString() {
+    return "NoteResult{" +
+            "noteId='" + noteId + '\'' +
+            ", isRunning=" + isRunning +
+            ", paragraphResultList=" + paragraphResultList +
+            '}';
   }
 }
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ParagraphResult.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ParagraphResult.java
new file mode 100644
index 0000000..a28c604
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ParagraphResult.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import kong.unirest.json.JSONArray;
+import kong.unirest.json.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * Represent the paragraph execution result.
+ *
+ */
+public class ParagraphResult {
+  private String paragraphId;
+  private Status status;
+  // if there's any job in the statement, then it will also contain a progress
+  // range from 0 to 100. e.g. spark job progress.
+  private int progress;
+  // each paragraph may return multiple results
+  private List<Result> results;
+  // if there's any job in the statement, then it will also contain job urls.
+  // e.g. spark job url
+  private List<String> jobUrls;
+
+  public ParagraphResult(JSONObject paragraphJson) {
+    this.paragraphId = paragraphJson.getString("id");
+    this.status = Status.valueOf(paragraphJson.getString("status"));
+    this.progress = paragraphJson.getInt("progress");
+    this.results = new ArrayList<>();
+    if (paragraphJson.has("results")) {
+      JSONObject resultJson = paragraphJson.getJSONObject("results");
+      JSONArray msgArray = resultJson.getJSONArray("msg");
+      for (int i = 0; i < msgArray.length(); ++i) {
+        JSONObject resultObject = msgArray.getJSONObject(i);
+        results.add(new Result(resultObject));
+      }
+    }
+
+    this.jobUrls = new ArrayList<>();
+    if (paragraphJson.has("runtimeInfos")) {
+      JSONObject runtimeInfosJson = paragraphJson.getJSONObject("runtimeInfos");
+      if (runtimeInfosJson.has("jobUrl")) {
+        JSONObject jobUrlJson = runtimeInfosJson.getJSONObject("jobUrl");
+        if (jobUrlJson.has("values")) {
+          JSONArray valuesArray = jobUrlJson.getJSONArray("values");
+          for (int i=0;i< valuesArray.length(); ++i) {
+            JSONObject object = valuesArray.getJSONObject(i);
+            if (object.has("jobUrl")) {
+              jobUrls.add(object.getString("jobUrl"));
+            }
+          }
+        }
+      }
+    }
+  }
+
+  public String getParagraphId() {
+    return paragraphId;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+  public int getProgress() {
+    return progress;
+  }
+
+  public List<Result> getResults() {
+    return results;
+  }
+
+  public List<String> getJobUrls() {
+    return jobUrls;
+  }
+
+  /**
+   * Get results in text format.
+   *
+   * @return
+   */
+  public String getResultInText() {
+    StringBuilder builder = new StringBuilder();
+    if (results != null) {
+      for (Result result : results) {
+        builder.append(result.getData() + "\n");
+      }
+    }
+    return builder.toString();
+  }
+
+  @Override
+  public String toString() {
+    return "ParagraphResult{" +
+            "paragraphId='" + paragraphId + '\'' +
+            ", status=" + status +
+            ", results=" + StringUtils.join(results, ", ") +
+            ", progress=" + progress +
+            '}';
+  }
+
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/Result.java
similarity index 53%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/Result.java
index 9cf1df1..e1c5b90 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/Result.java
@@ -14,39 +14,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.rest.message;
 
-import com.google.gson.Gson;
 
-import java.util.List;
+package org.apache.zeppelin.client;
 
-import org.apache.zeppelin.common.JsonSerializable;
+import kong.unirest.json.JSONObject;
 
 /**
- *  NewNoteRequest rest api request message.
+ * Represent one segment of result of paragraph. The result of paragraph could consists of
+ * multiple Results.
  */
-public class NewNoteRequest implements JsonSerializable {
-  private static final Gson gson = new Gson();
+public class Result {
+  private String type;
+  private String data;
 
-  String name;
-  List<NewParagraphRequest> paragraphs;
-
-  public NewNoteRequest (){
+  public Result(JSONObject jsonObject) {
+    this.type = jsonObject.getString("type");
+    this.data = jsonObject.getString("data");
   }
 
-  public String getName() {
-    return name;
+  public Result(String type, String data) {
+    this.type = type;
+    this.data = data;
   }
 
-  public List<NewParagraphRequest> getParagraphs() {
-    return paragraphs;
+  public String getType() {
+    return type;
   }
 
-  public String toJson() {
-    return gson.toJson(this);
+  public String getData() {
+    return data;
   }
 
-  public static NewNoteRequest fromJson(String json) {
-    return gson.fromJson(json, NewNoteRequest.class);
+  @Override
+  public String toString() {
+    return "Result{" +
+            "type='" + type + '\'' +
+            ", data='" + data + '\'' +
+            '}';
   }
 }
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/SessionInfo.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/SessionInfo.java
new file mode 100644
index 0000000..4eac5c4
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/SessionInfo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import kong.unirest.json.JSONObject;
+
+
+/**
+ * Represent each ZSession info.
+ */
+public class SessionInfo {
+
+  private String sessionId;
+  private String noteId;
+  private String interpreter;
+  private String state;
+  private String weburl;
+  private String startTime;
+
+  public SessionInfo(String sessionId) {
+    this.sessionId = sessionId;
+  }
+
+  public SessionInfo(JSONObject sessionJson) {
+    if (sessionJson.has("sessionId")) {
+      this.sessionId = sessionJson.getString("sessionId");
+    }
+    if (sessionJson.has("noteId")) {
+      this.noteId = sessionJson.getString("noteId");
+    }
+    if (sessionJson.has("interpreter")) {
+      this.interpreter = sessionJson.getString("interpreter");
+    }
+    if (sessionJson.has("state")) {
+      this.state = sessionJson.getString("state");
+    }
+    if (sessionJson.has("weburl")) {
+      this.weburl = sessionJson.getString("weburl");
+    }
+    if (sessionJson.has("startTime")) {
+      this.startTime = sessionJson.getString("startTime");
+    }
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public String getNoteId() {
+    return noteId;
+  }
+
+  public String getInterpreter() {
+    return interpreter;
+  }
+
+  public String getState() {
+    return state;
+  }
+
+  public String getWeburl() {
+    return weburl;
+  }
+
+  public String getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public String toString() {
+    return "SessionResult{" +
+            "sessionId='" + sessionId + '\'' +
+            ", interpreter='" + interpreter + '\'' +
+            ", state='" + state + '\'' +
+            ", weburl='" + weburl + '\'' +
+            ", startTime='" + startTime + '\'' +
+            '}';
+  }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/Status.java
similarity index 54%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/Status.java
index 9cf1df1..a22a026 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/Status.java
@@ -14,39 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.rest.message;
 
-import com.google.gson.Gson;
 
-import java.util.List;
-
-import org.apache.zeppelin.common.JsonSerializable;
+package org.apache.zeppelin.client;
 
 /**
- *  NewNoteRequest rest api request message.
+ * Job status.
+ *
+ * UNKNOWN - Job is not found in remote
+ * READY - Job is not running, ready to run.
+ * PENDING - Job is submitted to scheduler. but not running yet
+ * RUNNING - Job is running.
+ * FINISHED - Job finished run. with success
+ * ERROR - Job finished run. with error
+ * ABORT - Job finished by abort
  */
-public class NewNoteRequest implements JsonSerializable {
-  private static final Gson gson = new Gson();
-
-  String name;
-  List<NewParagraphRequest> paragraphs;
-
-  public NewNoteRequest (){
-  }
+public enum Status {
+  UNKNOWN, READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
 
-  public String getName() {
-    return name;
+  public boolean isReady() {
+    return this == READY;
   }
 
-  public List<NewParagraphRequest> getParagraphs() {
-    return paragraphs;
+  public boolean isRunning() {
+    return this == RUNNING;
   }
 
-  public String toJson() {
-    return gson.toJson(this);
+  public boolean isPending() {
+    return this == PENDING;
   }
 
-  public static NewNoteRequest fromJson(String json) {
-    return gson.fromJson(json, NewNoteRequest.class);
+  public boolean isCompleted() {
+    return this == FINISHED || this == ERROR || this == ABORT;
   }
 }
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
new file mode 100644
index 0000000..ffedf40
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.client.websocket.Message;
+import org.apache.zeppelin.client.websocket.MessageHandler;
+import org.apache.zeppelin.client.websocket.StatementMessageHandler;
+import org.apache.zeppelin.client.websocket.ZeppelinWebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Each ZSession represent one interpreter process, you can start/stop it, and execute/submit/cancel code.
+ * There's no Zeppelin concept(like note/paragraph) in ZSession.
+ *
+ */
+public class ZSession {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ZSession.class);
+
+  private ZeppelinClient zeppelinClient;
+  private String interpreter;
+  private Map<String, String> intpProperties;
+  // max number of retained statements, each statement represent one paragraph.
+  private int maxStatement;
+
+  private SessionInfo sessionInfo;
+
+  private ZeppelinWebSocketClient webSocketClient;
+
+  public ZSession(ClientConfig clientConfig,
+                  String interpreter) throws Exception {
+    this(clientConfig, interpreter, new HashMap<>(), 100);
+  }
+
+  public ZSession(ClientConfig clientConfig,
+                  String interpreter,
+                  Map<String, String> intpProperties,
+                  int maxStatement) throws Exception {
+    this.zeppelinClient = new ZeppelinClient(clientConfig);
+    this.interpreter = interpreter;
+    this.intpProperties = intpProperties;
+    this.maxStatement = maxStatement;
+  }
+
+  private ZSession(ClientConfig clientConfig,
+                   String interpreter,
+                   String sessionId) throws Exception {
+    this.zeppelinClient = new ZeppelinClient(clientConfig);
+    this.interpreter = interpreter;
+    this.sessionInfo = new SessionInfo(sessionId);
+  }
+
+  /**
+   * Start this ZSession, underneath it would create a note for this ZSession and
+   * start a dedicated interpreter group.
+   * This method won't establish websocket connection.
+   *
+   * @throws Exception
+   */
+  public void start() throws Exception {
+    start(null);
+  }
+
+  /**
+   * Start this ZSession, underneath it would create a note for this ZSession and
+   * start a dedicated interpreter process.
+   *
+   * @param messageHandler
+   * @throws Exception
+   */
+  public void start(MessageHandler messageHandler) throws Exception {
+    this.sessionInfo = zeppelinClient.newSession(interpreter);
+
+    // inline configuration
+    StringBuilder builder = new StringBuilder("%" + interpreter + ".conf\n");
+    if (intpProperties != null) {
+      for (Map.Entry<String, String> entry : intpProperties.entrySet()) {
+        builder.append(entry.getKey() + " " + entry.getValue() + "\n");
+      }
+    }
+    String paragraphId = zeppelinClient.addParagraph(getNoteId(), "Session Configuration", builder.toString());
+    ParagraphResult paragraphResult = zeppelinClient.executeParagraph(getNoteId(), paragraphId, getSessionId());
+    if (paragraphResult.getStatus() != Status.FINISHED) {
+      throw new Exception("Fail to configure session, " + paragraphResult.getResultInText());
+    }
+
+    // start session
+    // add local properties (init) to avoid skip empty paragraph.
+    paragraphId = zeppelinClient.addParagraph(getNoteId(), "Session Init", "%" + interpreter + "(init=true)");
+    paragraphResult = zeppelinClient.executeParagraph(getNoteId(), paragraphId, getSessionId());
+    if (paragraphResult.getStatus() != Status.FINISHED) {
+      throw new Exception("Fail to init session, " + paragraphResult.getResultInText());
+    }
+    this.sessionInfo = zeppelinClient.getSession(getSessionId());
+
+    if (messageHandler != null) {
+      this.webSocketClient = new ZeppelinWebSocketClient(messageHandler);
+      this.webSocketClient.connect(zeppelinClient.getClientConfig().getZeppelinRestUrl()
+              .replace("https", "ws").replace("http", "ws") + "/ws");
+
+      // call GET_NOTE to establish websocket connection between this session and zeppelin-server
+      Message msg = new Message(Message.OP.GET_NOTE);
+      msg.put("id", getNoteId());
+      this.webSocketClient.send(msg);
+    }
+  }
+
+  /**
+   * Stop this underlying interpreter process.
+   *
+   * @throws Exception
+   */
+  public void stop() throws Exception {
+    if (getSessionId() != null) {
+      zeppelinClient.stopSession(getSessionId());
+    }
+    if (webSocketClient != null) {
+      webSocketClient.stop();
+    }
+  }
+
+  /**
+   * Session has been started in ZeppelinServer, this method is just to reconnect it.
+   * This method is used for connect to an existing session in ZeppelinServer, instead of
+   * start it from ZSession.
+   * @throws Exception
+   */
+  public static ZSession createFromExistingSession(ClientConfig clientConfig,
+                                                   String interpreter,
+                                                   String sessionId) throws Exception {
+    return createFromExistingSession(clientConfig, interpreter, sessionId, null);
+  }
+
+  /**
+   * Session has been started in ZeppelinServer, this method is just to reconnect it.
+   * This method is used for connect to an existing session in ZeppelinServer, instead of
+   * start it from ZSession.
+   * @throws Exception
+   */
+  public static ZSession createFromExistingSession(ClientConfig clientConfig,
+                                                   String interpreter,
+                                                   String sessionId,
+                                                   MessageHandler messageHandler) throws Exception {
+    ZSession session = new ZSession(clientConfig, interpreter, sessionId);
+    session.reconnect(messageHandler);
+    return session;
+  }
+
+  private void reconnect(MessageHandler messageHandler) throws Exception {
+    this.sessionInfo = this.zeppelinClient.getSession(getSessionId());
+    if (!sessionInfo.getState().equalsIgnoreCase("Running")) {
+      throw new Exception("Session " + getSessionId() + " is not running, state: " + sessionInfo.getState());
+    }
+
+    if (messageHandler != null) {
+      this.webSocketClient = new ZeppelinWebSocketClient(messageHandler);
+      this.webSocketClient.connect(zeppelinClient.getClientConfig().getZeppelinRestUrl()
+              .replace("https", "ws").replace("http", "ws") + "/ws");
+
+      // call GET_NOTE to establish websocket connection between this session and zeppelin-server
+      Message msg = new Message(Message.OP.GET_NOTE);
+      msg.put("id", getNoteId());
+      this.webSocketClient.send(msg);
+    }
+  }
+
+  /**
+   * Run code in non-blocking way.
+   *
+   * @param code
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult execute(String code) throws Exception {
+    return execute("", code);
+  }
+
+  /**
+   * Run code in non-blocking way.
+   *
+   * @param code
+   * @param messageHandler
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult execute(String code, StatementMessageHandler messageHandler) throws Exception {
+    return execute("", code, messageHandler);
+  }
+
+  /**
+   *
+   * @param subInterpreter
+   * @param code
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult execute(String subInterpreter, String code) throws Exception {
+    return execute(subInterpreter, new HashMap<>(), code);
+  }
+
+  /**
+   *
+   * @param subInterpreter
+   * @param code
+   * @param messageHandler
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult execute(String subInterpreter, String code, StatementMessageHandler messageHandler) throws Exception {
+    return execute(subInterpreter, new HashMap<>(), code, messageHandler);
+  }
+
+  /**
+   *
+   * @param subInterpreter
+   * @param localProperties
+   * @param code
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult execute(String subInterpreter,
+                               Map<String, String> localProperties,
+                               String code) throws Exception {
+    return execute(subInterpreter, localProperties, code, null);
+  }
+
+  /**
+   *
+   * @param subInterpreter
+   * @param localProperties
+   * @param code
+   * @param messageHandler
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult execute(String subInterpreter,
+                               Map<String, String> localProperties,
+                               String code,
+                               StatementMessageHandler messageHandler) throws Exception {
+    StringBuilder builder = new StringBuilder("%" + interpreter);
+    if (!StringUtils.isBlank(subInterpreter)) {
+      builder.append("." + subInterpreter);
+    }
+    if (localProperties != null && !localProperties.isEmpty()) {
+      builder.append("(");
+      for (Map.Entry<String, String> entry : localProperties.entrySet()) {
+        builder.append(entry.getKey() + "=\"" + entry.getValue() + "\"");
+      }
+      builder.append(")");
+    }
+    builder.append("\n" + code);
+
+    String text = builder.toString();
+
+    String nextParagraphId = zeppelinClient.nextSessionParagraph(getNoteId(), maxStatement);
+    zeppelinClient.updateParagraph(getNoteId(), nextParagraphId, "", text);
+
+    if (messageHandler != null) {
+      webSocketClient.addStatementMessageHandler(nextParagraphId, messageHandler);
+    }
+    ParagraphResult paragraphResult = zeppelinClient.executeParagraph(getNoteId(), nextParagraphId, getSessionId());
+    return new ExecuteResult(paragraphResult);
+  }
+
+  /**
+   *
+   * @param code
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult submit(String code) throws Exception {
+    return submit("", code);
+  }
+
+  /**
+   *
+   * @param code
+   * @param messageHandler
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult submit(String code, StatementMessageHandler messageHandler) throws Exception {
+    return submit("", code, messageHandler);
+  }
+
+  /**
+   *
+   * @param subInterpreter
+   * @param code
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult submit(String subInterpreter, String code) throws Exception {
+    return submit(subInterpreter, new HashMap<>(), code);
+  }
+
+  /**
+   *
+   * @param subInterpreter
+   * @param code
+   * @param messageHandler
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult submit(String subInterpreter, String code, StatementMessageHandler messageHandler) throws Exception {
+    return submit(subInterpreter, new HashMap<>(), code, messageHandler);
+  }
+
+  /**
+   *
+   * @param subInterpreter
+   * @param code
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult submit(String subInterpreter, Map<String, String> localProperties, String code) throws Exception {
+    return submit(subInterpreter, localProperties, code, null);
+  }
+
+  /**
+   *
+   * @param subInterpreter
+   * @param code
+   * @param messageHandler
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult submit(String subInterpreter,
+                              Map<String, String> localProperties,
+                              String code,
+                              StatementMessageHandler messageHandler) throws Exception {
+    StringBuilder builder = new StringBuilder("%" + interpreter);
+    if (!StringUtils.isBlank(subInterpreter)) {
+      builder.append("." + subInterpreter);
+    }
+    if (localProperties != null && !localProperties.isEmpty()) {
+      builder.append("(");
+      for (Map.Entry<String, String> entry : localProperties.entrySet()) {
+        builder.append(entry.getKey() + "=\"" + entry.getValue() + "\"");
+      }
+      builder.append(")");
+    }
+    builder.append("\n" + code);
+
+    String text = builder.toString();
+    String nextParagraphId = zeppelinClient.nextSessionParagraph(getNoteId(), maxStatement);
+    zeppelinClient.updateParagraph(getNoteId(), nextParagraphId, "", text);
+    if (messageHandler != null) {
+      webSocketClient.addStatementMessageHandler(nextParagraphId, messageHandler);
+    }
+    ParagraphResult paragraphResult = zeppelinClient.submitParagraph(getNoteId(), nextParagraphId, getSessionId());
+    return new ExecuteResult(paragraphResult);
+  }
+
+  /**
+   *
+   * @param statementId
+   * @throws Exception
+   */
+  public void cancel(String statementId) throws Exception {
+    zeppelinClient.cancelParagraph(getNoteId(), statementId);
+  }
+
+  /**
+   *
+   * @param statementId
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult queryStatement(String statementId) throws Exception {
+    ParagraphResult paragraphResult = zeppelinClient.queryParagraphResult(getNoteId(), statementId);
+    return new ExecuteResult(paragraphResult);
+  }
+
+  /**
+   *
+   * @param statementId
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult waitUntilFinished(String statementId) throws Exception {
+    ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphFinish(getNoteId(), statementId);
+    return new ExecuteResult(paragraphResult);
+  }
+
+  /**
+   *
+   * @param statementId
+   * @return
+   * @throws Exception
+   */
+  public ExecuteResult waitUntilRunning(String statementId) throws Exception {
+    ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphRunning(getNoteId(), statementId);
+    return new ExecuteResult(paragraphResult);
+  }
+
+  public String getNoteId() {
+    if (this.sessionInfo != null) {
+      return this.sessionInfo.getNoteId();
+    } else {
+      return null;
+    }
+  }
+
+  public String getWeburl() {
+    if (this.sessionInfo != null) {
+      return sessionInfo.getWeburl();
+    } else {
+      return null;
+    }
+  }
+
+  public String getSessionId() {
+    if (this.sessionInfo != null) {
+      return this.sessionInfo.getSessionId();
+    } else {
+      return null;
+    }
+  }
+
+  public String getInterpreter() {
+    return interpreter;
+  }
+
+  public ZeppelinClient getZeppelinClient() {
+    return zeppelinClient;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private ClientConfig clientConfig;
+    private String interpreter;
+    private Map<String, String> intpProperties;
+    private int maxStatement = 100;
+
+    public Builder setClientConfig(ClientConfig clientConfig) {
+      this.clientConfig = clientConfig;
+      return this;
+    }
+
+    public Builder setInterpreter(String interpreter) {
+      this.interpreter = interpreter;
+      return this;
+    }
+
+    public Builder setIntpProperties(Map<String, String> intpProperties) {
+      this.intpProperties = intpProperties;
+      return this;
+    }
+
+    public ZSession build() throws Exception {
+      return new ZSession(clientConfig, interpreter, intpProperties, maxStatement);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    ClientConfig clientConfig = new ClientConfig("http://localhost:8080", 1000);
+//    ZSession hiveSession = new ZSession(clientConfig, "hive", new HashMap<>(), 100);
+//    hiveSession.start();
+//
+//    ExecuteResult executeResult = hiveSession.submit("show tables");
+//    executeResult = hiveSession.waitUntilFinished(executeResult.getStatementId());
+//    System.out.println(executeResult.toString());
+//
+//    executeResult = hiveSession.submit("select eid, count(1) from employee group by eid");
+//    executeResult = hiveSession.waitUntilFinished(executeResult.getStatementId());
+//    System.out.println(executeResult.toString());
+
+    ZSession sparkSession = ZSession.createFromExistingSession(clientConfig, "hive", "hive_1598418780469");
+    ExecuteResult executeResult = sparkSession.execute("show databases");
+    System.out.println(executeResult);
+
+//    ExecuteResult executeResult = sparkSession.submit("sql", "show tables");
+//    executeResult = sparkSession.waitUntilFinished(executeResult.getStatementId());
+//    System.out.println(executeResult.toString());
+//
+//    executeResult = sparkSession.submit("sql", "select eid, count(1) from employee group by eid");
+//    executeResult = sparkSession.waitUntilFinished(executeResult.getStatementId());
+//    System.out.println(executeResult.toString());
+  }
+}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZeppelinClient.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZeppelinClient.java
new file mode 100644
index 0000000..9bb5265
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZeppelinClient.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import kong.unirest.GetRequest;
+import kong.unirest.HttpResponse;
+import kong.unirest.JsonNode;
+import kong.unirest.Unirest;
+import kong.unirest.apache.ApacheClient;
+import kong.unirest.json.JSONArray;
+import kong.unirest.json.JSONObject;
+import org.apache.commons.text.StringEscapeUtils;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import unirest.shaded.org.apache.http.client.HttpClient;
+import unirest.shaded.org.apache.http.impl.client.HttpClients;
+
+import javax.net.ssl.SSLContext;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Low level api for interacting with Zeppelin. Underneath, it use the zeppelin rest api.
+ * You can use this class to operate Zeppelin note/paragraph,
+ * e.g. get/add/delete/update/execute/cancel
+ */
+public class ZeppelinClient {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinClient.class);
+
+  private ClientConfig clientConfig;
+
+  public ZeppelinClient(ClientConfig clientConfig) throws Exception {
+    this.clientConfig = clientConfig;
+    Unirest.config().defaultBaseUrl(clientConfig.getZeppelinRestUrl() + "/api");
+
+    if (clientConfig.isUseKnox()) {
+      try {
+        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustSelfSignedStrategy() {
+          public boolean isTrusted(X509Certificate[] chain, String authType) {
+            return true;
+          }
+        }).build();
+        HttpClient customHttpClient = HttpClients.custom().setSSLContext(sslContext)
+                .setSSLHostnameVerifier(new NoopHostnameVerifier()).build();
+        Unirest.config().httpClient(ApacheClient.builder(customHttpClient));
+      } catch (Exception e) {
+        throw new Exception("Fail to setup httpclient of Unirest", e);
+      }
+    }
+  }
+
+  public ClientConfig getClientConfig() {
+    return clientConfig;
+  }
+
+  /**
+   * Throw exception if the status code is not 200.
+   *
+   * @param response
+   * @throws Exception
+   */
+  private void checkResponse(HttpResponse<JsonNode> response) throws Exception {
+    if (response.getStatus() == 302) {
+      throw new Exception("Please login first");
+    }
+    if (response.getStatus() != 200) {
+      throw new Exception(String.format("Unable to call rest api, status: %s, statusText: %s, message: %s",
+              response.getStatus(),
+              response.getStatusText(),
+              response.getBody().getObject().getString("message")));
+    }
+  }
+
+  /**
+   * Throw exception if the status in the json object is not `OK`.
+   *
+   * @param jsonNode
+   * @throws Exception
+   */
+  private void checkJsonNodeStatus(JsonNode jsonNode) throws Exception {
+    if (! "OK".equalsIgnoreCase(jsonNode.getObject().getString("status"))) {
+      throw new Exception(StringEscapeUtils.unescapeJava(jsonNode.getObject().getString("message")));
+    }
+  }
+
+  /**
+   * Get Zeppelin version.
+   *
+   * @return
+   * @throws Exception
+   */
+  public String getVersion() throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .get("/version")
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    return jsonNode.getObject().getJSONObject("body").getString("version");
+  }
+
+  /**
+   * Request a new session id. It doesn't create session (interpreter process) in zeppelin server side, but just
+   * create an unique session id.
+   *
+   * @param interpreter
+   * @return
+   * @throws Exception
+   */
+  public SessionInfo newSession(String interpreter) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .post("/session")
+            .queryString("interpreter", interpreter)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    return new SessionInfo(jsonNode.getObject().getJSONObject("body"));
+  }
+
+  /**
+   * Stop the session(interpreter process) in Zeppelin server.
+   *
+   * @param sessionId
+   * @throws Exception
+   */
+  public void stopSession(String sessionId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .delete("/session/{sessionId}")
+            .routeParam("sessionId", sessionId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+  }
+
+  /**
+   * Get session info for the provided sessionId.
+   *
+   * @param sessionId
+   * @throws Exception
+   */
+  public SessionInfo getSession(String sessionId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .get("/session/{sessionId}")
+            .routeParam("sessionId", sessionId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    JSONObject bodyObject = jsonNode.getObject().getJSONObject("body");
+    return new SessionInfo(bodyObject);
+  }
+
+  /**
+   * List all the sessions.
+   *
+   * @return
+   * @throws Exception
+   */
+  public List<SessionInfo> listSessions() throws Exception {
+    return listSessions(null);
+  }
+
+  /**
+   * List all the sessions for the provided interpreter.
+   *
+   * @param interpreter
+   * @return
+   * @throws Exception
+   */
+  public List<SessionInfo> listSessions(String interpreter) throws Exception {
+    GetRequest getRequest = Unirest.get("/session");
+    if (interpreter != null) {
+      getRequest.queryString("interpreter", interpreter);
+    }
+    HttpResponse<JsonNode> response = getRequest.asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    JSONArray sessionJsonArray = jsonNode.getObject().getJSONArray("body");
+    List<SessionInfo> sessionInfos = new ArrayList<>();
+    for (int i = 0; i< sessionJsonArray.length();++i) {
+      sessionInfos.add(new SessionInfo(sessionJsonArray.getJSONObject(i)));
+    }
+    return sessionInfos;
+  }
+
+  /**
+   * Login zeppelin with userName and password, throw exception if login fails.
+   *
+   * @param userName
+   * @param password
+   * @throws Exception
+   */
+  public void login(String userName, String password) throws Exception {
+    if (clientConfig.isUseKnox()) {
+      HttpResponse<String> response = Unirest.get("/")
+              .basicAuth(userName, password)
+              .asString();
+      if (response.getStatus() != 200) {
+        throw new Exception(String.format("Login failed, status: %s, statusText: %s",
+                response.getStatus(),
+                response.getStatusText()));
+      }
+    } else {
+      HttpResponse<JsonNode> response = Unirest
+              .post("/login")
+              .field("userName", userName)
+              .field("password", password)
+              .asJson();
+      if (response.getStatus() != 200) {
+        throw new Exception(String.format("Login failed, status: %s, statusText: %s",
+                response.getStatus(),
+                response.getStatusText()));
+      }
+    }
+  }
+
+  public String createNote(String notePath) throws Exception {
+    return createNote(notePath, "");
+  }
+
+  /**
+   * Create a new empty note with provided notePath and defaultInterpreterGroup
+   *
+   * @param notePath
+   * @param defaultInterpreterGroup
+   * @return
+   * @throws Exception
+   */
+  public String createNote(String notePath, String defaultInterpreterGroup) throws Exception {
+    JSONObject bodyObject = new JSONObject();
+    bodyObject.put("name", notePath);
+    bodyObject.put("defaultInterpreterGroup", defaultInterpreterGroup);
+    HttpResponse<JsonNode> response = Unirest
+            .post("/notebook")
+            .body(bodyObject.toString())
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    return jsonNode.getObject().getString("body");
+  }
+
+  /**
+   * Delete note with provided noteId.
+   *
+   * @param noteId
+   * @throws Exception
+   */
+  public void deleteNote(String noteId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .delete("/notebook/{noteId}")
+            .routeParam("noteId", noteId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+  }
+
+  /**
+   * Query {@link NoteResult} with provided noteId.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult queryNoteResult(String noteId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .get("/notebook/{noteId}")
+            .routeParam("noteId", noteId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    JSONObject noteJsonObject = jsonNode.getObject().getJSONObject("body");
+    boolean isRunning = false;
+    if (noteJsonObject.has("info")) {
+      JSONObject infoJsonObject = noteJsonObject.getJSONObject("info");
+      if (infoJsonObject.has("isRunning")) {
+        isRunning = Boolean.parseBoolean(infoJsonObject.getString("isRunning"));
+      }
+    }
+
+    List<ParagraphResult> paragraphResultList = new ArrayList<>();
+    if (noteJsonObject.has("paragraphs")) {
+      JSONArray paragraphJsonArray = noteJsonObject.getJSONArray("paragraphs");
+      for (int i = 0; i< paragraphJsonArray.length(); ++i) {
+        paragraphResultList.add(new ParagraphResult(paragraphJsonArray.getJSONObject(i)));
+      }
+    }
+
+    return new NoteResult(noteId, isRunning, paragraphResultList);
+  }
+
+  /**
+   * Execute note with provided noteId, return until note execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult executeNote(String noteId) throws Exception {
+    return executeNote(noteId, new HashMap<>());
+  }
+
+  /**
+   * Execute note with provided noteId and parameters, return until note execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @param parameters
+   * @return
+   * @throws Exception
+   */
+  public NoteResult executeNote(String noteId, Map<String, String> parameters) throws Exception {
+    submitNote(noteId, parameters);
+    return waitUntilNoteFinished(noteId);
+  }
+
+  /**
+   * Submit note to execute with provided noteId, return at once the submission is completed.
+   * You need to query {@link NoteResult} by yourself afterwards until note execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult submitNote(String noteId) throws Exception  {
+    return submitNote(noteId, new HashMap<>());
+  }
+
+  /**
+   * Submit note to execute with provided noteId and parameters, return at once the submission is completed.
+   * You need to query {@link NoteResult} by yourself afterwards until note execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @param parameters
+   * @return
+   * @throws Exception
+   */
+  public NoteResult submitNote(String noteId, Map<String, String> parameters) throws Exception  {
+    JSONObject bodyObject = new JSONObject();
+    bodyObject.put("params", parameters);
+    // run note in non-blocking and isolated way.
+    HttpResponse<JsonNode> response = Unirest
+            .post("/notebook/job/{noteId}")
+            .routeParam("noteId", noteId)
+            .queryString("blocking", "false")
+            .queryString("isolated", "true")
+            .body(bodyObject)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    return queryNoteResult(noteId);
+  }
+
+  /**
+   * Block there until note execution is completed.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult waitUntilNoteFinished(String noteId) throws Exception {
+    while (true) {
+      NoteResult noteResult = queryNoteResult(noteId);
+      if (!noteResult.isRunning()) {
+        return noteResult;
+      }
+      Thread.sleep(clientConfig.getQueryInterval());
+    }
+  }
+
+  /**
+   * Block there until note execution is completed, and throw exception if note execution is not completed
+   * in <code>timeoutInMills</code>.
+   *
+   * @param noteId
+   * @param timeoutInMills
+   * @return
+   * @throws Exception
+   */
+  public NoteResult waitUntilNoteFinished(String noteId, long timeoutInMills) throws Exception {
+    long start = System.currentTimeMillis();
+    while (true && (System.currentTimeMillis() - start) < timeoutInMills) {
+      NoteResult noteResult = queryNoteResult(noteId);
+      if (!noteResult.isRunning()) {
+        return noteResult;
+      }
+      Thread.sleep(clientConfig.getQueryInterval());
+    }
+    throw new Exception("Note is not finished in " + timeoutInMills / 1000 + " seconds");
+  }
+
+  /**
+   * Add paragraph to note with provided title and text.
+   *
+   * @param noteId
+   * @param title
+   * @param text
+   * @return
+   * @throws Exception
+   */
+  public String addParagraph(String noteId, String title, String text) throws Exception {
+    JSONObject bodyObject = new JSONObject();
+    bodyObject.put("title", title);
+    bodyObject.put("text", text);
+    HttpResponse<JsonNode> response = Unirest.post("/notebook/{noteId}/paragraph")
+            .routeParam("noteId", noteId)
+            .body(bodyObject.toString())
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    return jsonNode.getObject().getString("body");
+  }
+
+  /**
+   * Update paragraph with specified title and text.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @param title
+   * @param text
+   * @throws Exception
+   */
+  public void updateParagraph(String noteId, String paragraphId, String title, String text) throws Exception {
+    JSONObject bodyObject = new JSONObject();
+    bodyObject.put("title", title);
+    bodyObject.put("text", text);
+    HttpResponse<JsonNode> response = Unirest.put("/notebook/{noteId}/paragraph/{paragraphId}")
+            .routeParam("noteId", noteId)
+            .routeParam("paragraphId", paragraphId)
+            .body(bodyObject.toString())
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+  }
+
+  /**
+   * Execute paragraph with parameters in specified session. If sessionId is null or empty string, then it depends on
+   * the interpreter binding mode of Note(e.g. isolated per note), otherwise it will run in the specified session.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @param sessionId
+   * @param parameters
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult executeParagraph(String noteId,
+                                          String paragraphId,
+                                          String sessionId,
+                                          Map<String, String> parameters) throws Exception {
+    submitParagraph(noteId, paragraphId, sessionId, parameters);
+    return waitUtilParagraphFinish(noteId, paragraphId);
+  }
+
+  /**
+   * Execute paragraph with parameters.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @param parameters
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult executeParagraph(String noteId,
+                                          String paragraphId,
+                                          Map<String, String> parameters) throws Exception {
+    return executeParagraph(noteId, paragraphId, "", parameters);
+  }
+
+  /**
+   * Execute paragraph in specified session. If sessionId is null or empty string, then it depends on
+   * the interpreter binding mode of Note (e.g. isolated per note), otherwise it will run in the specified session.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @param sessionId
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult executeParagraph(String noteId,
+                                          String paragraphId,
+                                          String sessionId) throws Exception {
+    return executeParagraph(noteId, paragraphId, sessionId, new HashMap<>());
+  }
+
+  /**
+   * Execute paragraph.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult executeParagraph(String noteId, String paragraphId) throws Exception {
+    return executeParagraph(noteId, paragraphId, "", new HashMap<>());
+  }
+
+  /**
+   * Submit paragraph to execute with provided parameters and sessionId. Return at once the submission is completed.
+   * You need to query {@link ParagraphResult} by yourself afterwards until paragraph execution is completed.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @param sessionId
+   * @param parameters
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult submitParagraph(String noteId,
+                                         String paragraphId,
+                                         String sessionId,
+                                         Map<String, String> parameters) throws Exception {
+    JSONObject bodyObject = new JSONObject();
+    bodyObject.put("params", parameters);
+    HttpResponse<JsonNode> response = Unirest
+            .post("/notebook/job/{noteId}/{paragraphId}")
+            .routeParam("noteId", noteId)
+            .routeParam("paragraphId", paragraphId)
+            .queryString("sessionId", sessionId)
+            .body(bodyObject.toString())
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    return queryParagraphResult(noteId, paragraphId);
+  }
+
+  /**
+   * Submit paragraph to execute with provided sessionId. Return at once the submission is completed.
+   * You need to query {@link ParagraphResult} by yourself afterwards until paragraph execution is completed.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @param sessionId
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult submitParagraph(String noteId,
+                                         String paragraphId,
+                                         String sessionId) throws Exception {
+    return submitParagraph(noteId, paragraphId, sessionId, new HashMap<>());
+  }
+
+  /**
+   * Submit paragraph to execute with provided parameters. Return at once the submission is completed.
+   * You need to query {@link ParagraphResult} by yourself afterwards until paragraph execution is completed.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @param parameters
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult submitParagraph(String noteId,
+                                         String paragraphId,
+                                         Map<String, String> parameters) throws Exception {
+    return submitParagraph(noteId, paragraphId, "", parameters);
+  }
+
+  /**
+   * Submit paragraph to execute. Return at once the submission is completed.
+   * You need to query {@link ParagraphResult} by yourself afterwards until paragraph execution is completed.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult submitParagraph(String noteId, String paragraphId) throws Exception {
+    return submitParagraph(noteId, paragraphId, "", new HashMap<>());
+  }
+
+  /**
+   * This used by {@link ZSession} for creating or reusing a paragraph for executing another piece of code.
+   *
+   * @param noteId
+   * @param maxParagraph
+   * @return
+   * @throws Exception
+   */
+  public String nextSessionParagraph(String noteId, int maxParagraph) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .post("/notebook/{noteId}/paragraph/next")
+            .routeParam("noteId", noteId)
+            .queryString("maxParagraph", maxParagraph)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    return jsonNode.getObject().getString("message");
+  }
+
+  /**
+   * Cancel a running paragraph.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @throws Exception
+   */
+  public void cancelParagraph(String noteId, String paragraphId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .delete("/notebook/job/{noteId}/{paragraphId}")
+            .routeParam("noteId", noteId)
+            .routeParam("paragraphId", paragraphId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+  }
+
+  /**
+   * Query {@link ParagraphResult}
+   *
+   * @param noteId
+   * @param paragraphId
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult queryParagraphResult(String noteId, String paragraphId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .get("/notebook/{noteId}/paragraph/{paragraphId}")
+            .routeParam("noteId", noteId)
+            .routeParam("paragraphId", paragraphId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    JSONObject paragraphJson = jsonNode.getObject().getJSONObject("body");
+    return new ParagraphResult(paragraphJson);
+  }
+
+  /**
+   * Query {@link ParagraphResult} until it is finished.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId) throws Exception {
+    while (true) {
+      ParagraphResult paragraphResult = queryParagraphResult(noteId, paragraphId);
+      LOGGER.debug(paragraphResult.toString());
+      if (paragraphResult.getStatus().isCompleted()) {
+        return paragraphResult;
+      }
+      Thread.sleep(clientConfig.getQueryInterval());
+    }
+  }
+
+  /**
+   * Query {@link ParagraphResult} until it is finished or timeout.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @param timeoutInMills
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId, long timeoutInMills) throws Exception {
+    long start = System.currentTimeMillis();
+    while (true && (System.currentTimeMillis() - start) < timeoutInMills) {
+      ParagraphResult paragraphResult = queryParagraphResult(noteId, paragraphId);
+      if (paragraphResult.getStatus().isCompleted()) {
+        return paragraphResult;
+      }
+      Thread.sleep(clientConfig.getQueryInterval());
+    }
+    throw new Exception("Paragraph is not finished in " + timeoutInMills / 1000 + " seconds");
+  }
+
+  /**
+   * Query {@link ParagraphResult} until it is running.
+   *
+   * @param noteId
+   * @param paragraphId
+   * @return
+   * @throws Exception
+   */
+  public ParagraphResult waitUtilParagraphRunning(String noteId, String paragraphId) throws Exception {
+    while (true) {
+      ParagraphResult paragraphResult = queryParagraphResult(noteId, paragraphId);
+      if (paragraphResult.getStatus().isRunning()) {
+        return paragraphResult;
+      }
+      Thread.sleep(clientConfig.getQueryInterval());
+    }
+  }
+
+  /**
+   * This is equal to the restart operation in note page.
+   *
+   * @param noteId
+   * @param interpreter
+   */
+  public void stopInterpreter(String noteId, String interpreter) throws Exception {
+    JSONObject bodyObject = new JSONObject();
+    bodyObject.put("noteId", noteId);
+    HttpResponse<JsonNode> response = Unirest
+            .put("/interpreter/setting/restart/{interpreter}")
+            .routeParam("interpreter", interpreter)
+            .body(bodyObject.toString())
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+  }
+}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/AbstractMessageHandler.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/AbstractMessageHandler.java
new file mode 100644
index 0000000..1aa9eee
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/AbstractMessageHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.websocket;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract class of MessageHandler.
+ */
+public abstract class AbstractMessageHandler implements MessageHandler {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageHandler.class);
+  private static final Gson GSON = new Gson();
+
+  @Override
+  public void onMessage(String msg) {
+    try {
+      Message messageReceived = GSON.fromJson(msg, Message.class);
+      if (messageReceived.op != Message.OP.PING) {
+        LOGGER.debug("RECEIVE: " + messageReceived.op +
+                ", RECEIVE PRINCIPAL: " + messageReceived.principal +
+                ", RECEIVE TICKET: " + messageReceived.ticket +
+                ", RECEIVE ROLES: " + messageReceived.roles +
+                ", RECEIVE DATA: " + messageReceived.data);
+      }
+
+      switch (messageReceived.op) {
+        case PARAGRAPH_UPDATE_OUTPUT:
+          String noteId = (String) messageReceived.data.get("noteId");
+          String paragraphId = (String) messageReceived.data.get("paragraphId");
+          int index = (int) Double.parseDouble(messageReceived.data.get("index").toString());
+          String type = (String) messageReceived.data.get("type");
+          String output = (String) messageReceived.data.get("data");
+          onStatementUpdateOutput(paragraphId, index, type, output);
+          break;
+        case PARAGRAPH_APPEND_OUTPUT:
+          noteId = (String) messageReceived.data.get("noteId");
+          paragraphId = (String) messageReceived.data.get("paragraphId");
+          index = (int) Double.parseDouble(messageReceived.data.get("index").toString());
+          output = (String) messageReceived.data.get("data");
+          onStatementAppendOutput(paragraphId, index, output);
+          break;
+        default:
+          break;
+      }
+    } catch (Exception e) {
+      LOGGER.error("Can't handle message: " + msg, e);
+    }
+  }
+
+  /**
+   * Invoked when there's new statement output appended.
+   *
+   * @param statementId
+   * @param index
+   * @param output
+   */
+  public abstract void onStatementAppendOutput(String statementId, int index, String output);
+
+  /**
+   * Invoked when statement's output is updated.
+   *
+   * @param statementId
+   * @param index
+   * @param type
+   * @param output
+   */
+  public abstract void onStatementUpdateOutput(String statementId, int index, String type, String output);
+
+}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/CompositeMessageHandler.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/CompositeMessageHandler.java
new file mode 100644
index 0000000..51169fb
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/CompositeMessageHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.websocket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * CompositeMessageHandler allos you to add StatementHandler for each statement.
+ */
+public class CompositeMessageHandler extends AbstractMessageHandler {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(CompositeMessageHandler.class);
+
+  private Map<String, StatementMessageHandler> messageHandlers = new HashMap();
+
+  public void addStatementMessageHandler(String statementId,
+                                         StatementMessageHandler messageHandler) {
+    messageHandlers.put(statementId, messageHandler);
+  }
+
+  @Override
+  public void onStatementAppendOutput(String statementId, int index, String output) {
+    StatementMessageHandler messageHandler = messageHandlers.get(statementId);
+    if (messageHandler == null) {
+      LOGGER.warn("No messagehandler for statement: " + statementId);
+      return;
+    }
+    messageHandler.onStatementAppendOutput(statementId, index, output);
+  }
+
+  @Override
+  public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
+    StatementMessageHandler messageHandler = messageHandlers.get(statementId);
+    if (messageHandler == null) {
+      LOGGER.warn("No messagehandler for statement: " + statementId);
+      return;
+    }
+    messageHandler.onStatementUpdateOutput(statementId, index, type, output);
+  }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/JsonSerializable.java
similarity index 57%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/JsonSerializable.java
index 8a6d0d0..914243c 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/JsonSerializable.java
@@ -14,32 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.rest.message;
 
-import com.google.gson.Gson;
-
-import org.apache.zeppelin.common.JsonSerializable;
+package org.apache.zeppelin.client.websocket;
 
 /**
- * RestartInterpreter rest api request message.
+ * Interface for class that can be serialized to json
  */
-public class RestartInterpreterRequest implements JsonSerializable {
-  private static final Gson gson = new Gson();
-
-  String noteId;
-
-  public RestartInterpreterRequest() {
-  }
-
-  public String getNoteId() {
-    return noteId;
-  }
-
-  public String toJson() {
-    return gson.toJson(this);
-  }
+public interface JsonSerializable {
 
-  public static RestartInterpreterRequest fromJson(String json) {
-    return gson.fromJson(json, RestartInterpreterRequest.class);
-  }
+  String toJson();
 }
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/Message.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/Message.java
new file mode 100644
index 0000000..cca3a46
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/Message.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.websocket;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Copied from zeppelin-zengine (TODO, zjffdu). Should resume the same piece of code instead of copying.
+ * Zeppelin websocket message template class.
+ */
+public class Message implements JsonSerializable {
+  /**
+   * Representation of event type.
+   */
+  public static enum OP {
+    GET_HOME_NOTE,    // [c-s] load note for home screen
+
+    GET_NOTE,         // [c-s] client load note
+                      // @param id note id
+
+    NOTE,             // [s-c] note info
+                      // @param note serialized Note object
+
+    PARAGRAPH,        // [s-c] paragraph info
+                      // @param paragraph serialized paragraph object
+
+    PROGRESS,         // [s-c] progress update
+                      // @param id paragraph id
+                      // @param progress percentage progress
+
+    NEW_NOTE,         // [c-s] create new notebook
+    DEL_NOTE,         // [c-s] delete notebook
+                      // @param id note id
+    REMOVE_FOLDER,
+    MOVE_NOTE_TO_TRASH,
+    MOVE_FOLDER_TO_TRASH,
+    RESTORE_FOLDER,
+    RESTORE_NOTE,
+    RESTORE_ALL,
+    EMPTY_TRASH,
+    CLONE_NOTE,       // [c-s] clone new notebook
+                      // @param id id of note to clone
+                      // @param name name for the cloned note
+    IMPORT_NOTE,      // [c-s] import notebook
+                      // @param object notebook
+
+    CONVERT_NOTE_NBFORMAT,     // [c-s] converting note to nbformat
+    CONVERTED_NOTE_NBFORMAT,     // [s-c] converting note to nbformat
+
+    NOTE_UPDATE,
+
+    NOTE_RENAME,
+
+    UPDATE_PERSONALIZED_MODE, // [c-s] update personalized mode (boolean)
+                              // @param note id and boolean personalized mode value
+
+    FOLDER_RENAME,
+
+    RUN_PARAGRAPH,    // [c-s] run paragraph
+                      // @param id paragraph id
+                      // @param paragraph paragraph content.ie. script
+                      // @param config paragraph config
+                      // @param params paragraph params
+
+    COMMIT_PARAGRAPH, // [c-s] commit paragraph
+                      // @param id paragraph id
+                      // @param title paragraph title
+                      // @param paragraph paragraph content.ie. script
+                      // @param config paragraph config
+                      // @param params paragraph params
+
+    CANCEL_PARAGRAPH, // [c-s] cancel paragraph run
+                      // @param id paragraph id
+
+    MOVE_PARAGRAPH,   // [c-s] move paragraph order
+                      // @param id paragraph id
+                      // @param index index the paragraph want to go
+
+    INSERT_PARAGRAPH, // [c-s] create new paragraph below current paragraph
+                      // @param target index
+
+    COPY_PARAGRAPH,   // [c-s] create new para below current para as a copy of current para
+                      // @param target index
+                      // @param title paragraph title
+                      // @param paragraph paragraph content.ie. script
+                      // @param config paragraph config
+                      // @param params paragraph params
+
+    EDITOR_SETTING,   // [c-s] ask paragraph editor setting
+                      // @param paragraph text keyword written in paragraph
+                      // ex) spark.spark or spark
+
+    COMPLETION,       // [c-s] ask completion candidates
+                      // @param id
+                      // @param buf current code
+                      // @param cursor cursor position in code
+
+    COMPLETION_LIST,  // [s-c] send back completion candidates list
+                      // @param id
+                      // @param completions list of string
+
+    LIST_NOTES,                   // [c-s] ask list of note
+    RELOAD_NOTES_FROM_REPO,       // [c-s] reload notes from repo
+
+    NOTES_INFO,                   // [s-c] list of note infos
+                                  // @param notes serialized List<NoteInfo> object
+
+    PARAGRAPH_REMOVE,
+    PARAGRAPH_CLEAR_OUTPUT,       // [c-s] clear output of paragraph
+    PARAGRAPH_CLEAR_ALL_OUTPUT,   // [c-s] clear output of all paragraphs
+    PARAGRAPH_APPEND_OUTPUT,      // [s-c] append output
+    PARAGRAPH_UPDATE_OUTPUT,      // [s-c] update (replace) output
+    PING,
+    AUTH_INFO,
+
+    ANGULAR_OBJECT_UPDATE,        // [s-c] add/update angular object
+    ANGULAR_OBJECT_REMOVE,        // [s-c] add angular object del
+    
+    ANGULAR_OBJECT_UPDATED,       // [c-s] angular object value updated,
+
+    ANGULAR_OBJECT_CLIENT_BIND,   // [c-s] angular object updated from AngularJS z object
+
+    ANGULAR_OBJECT_CLIENT_UNBIND, // [c-s] angular object unbind from AngularJS z object
+
+    LIST_CONFIGURATIONS,          // [c-s] ask all key/value pairs of configurations
+    CONFIGURATIONS_INFO,          // [s-c] all key/value pairs of configurations
+                                  // @param settings serialized Map<String, String> object
+
+    CHECKPOINT_NOTE,              // [c-s] checkpoint note to storage repository
+                                  // @param noteId
+                                  // @param checkpointName
+
+    LIST_REVISION_HISTORY,        // [c-s] list revision history of the notebook
+                                  // @param noteId
+    NOTE_REVISION,                // [c-s] get certain revision of note
+                                  // @param noteId
+                                  // @param revisionId
+    SET_NOTE_REVISION,            // [c-s] set current notebook head to this revision
+                                  // @param noteId
+                                  // @param revisionId
+    NOTE_REVISION_FOR_COMPARE,    // [c-s] get certain revision of note for compare
+                                  // @param noteId
+                                  // @param revisionId
+                                  // @param position
+    APP_APPEND_OUTPUT,            // [s-c] append output
+    APP_UPDATE_OUTPUT,            // [s-c] update (replace) output
+    APP_LOAD,                     // [s-c] on app load
+    APP_STATUS_CHANGE,            // [s-c] on app status change
+
+    LIST_NOTE_JOBS,               // [c-s] get note job management information
+    LIST_UPDATE_NOTE_JOBS,        // [c-s] get job management information for until unixtime
+    UNSUBSCRIBE_UPDATE_NOTE_JOBS, // [c-s] unsubscribe job information for job management
+    // @param unixTime
+    GET_INTERPRETER_BINDINGS,    // [c-s] get interpreter bindings
+    SAVE_INTERPRETER_BINDINGS,    // [c-s] save interpreter bindings
+    INTERPRETER_BINDINGS,         // [s-c] interpreter bindings
+
+    GET_INTERPRETER_SETTINGS,     // [c-s] get interpreter settings
+    INTERPRETER_SETTINGS,         // [s-c] interpreter settings
+    ERROR_INFO,                   // [s-c] error information to be sent
+    SESSION_LOGOUT,               // [s-c] error information to be sent
+    WATCHER,                      // [s-c] Change websocket to watcher mode.
+    PARAGRAPH_ADDED,              // [s-c] paragraph is added
+    PARAGRAPH_REMOVED,            // [s-c] paragraph deleted
+    PARAGRAPH_MOVED,              // [s-c] paragraph moved
+    NOTE_UPDATED,                 // [s-c] paragraph updated(name, config)
+    RUN_ALL_PARAGRAPHS,           // [c-s] run all paragraphs
+    PARAGRAPH_EXECUTED_BY_SPELL,  // [c-s] paragraph was executed by spell
+    RUN_PARAGRAPH_USING_SPELL,    // [s-c] run paragraph using spell
+    PARAS_INFO,                   // [s-c] paragraph runtime infos
+    SAVE_NOTE_FORMS,              // save note forms
+    REMOVE_NOTE_FORMS,            // remove note forms
+    INTERPRETER_INSTALL_STARTED,  // [s-c] start to download an interpreter
+    INTERPRETER_INSTALL_RESULT,   // [s-c] Status of an interpreter installation
+    COLLABORATIVE_MODE_STATUS,    // [s-c] collaborative mode status
+    PATCH_PARAGRAPH,              // [c-s][s-c] patch editor text
+    NOTE_RUNNING_STATUS,        // [s-c] sequential run status will be change
+    NOTICE                        // [s-c] Notice
+  }
+
+  // these messages will be ignored during the sequential run of the note
+  private static final Set<OP> disabledForRunningNoteMessages = Collections
+      .unmodifiableSet(new HashSet<>(Arrays.asList(
+          OP.COMMIT_PARAGRAPH,
+          OP.RUN_PARAGRAPH,
+          OP.RUN_PARAGRAPH_USING_SPELL,
+          OP.RUN_ALL_PARAGRAPHS,
+          OP.PARAGRAPH_CLEAR_OUTPUT,
+          OP.PARAGRAPH_CLEAR_ALL_OUTPUT,
+          OP.INSERT_PARAGRAPH,
+          OP.MOVE_PARAGRAPH,
+          OP.COPY_PARAGRAPH,
+          OP.PARAGRAPH_REMOVE,
+          OP.MOVE_NOTE_TO_TRASH,
+          OP.DEL_NOTE,
+          OP.PATCH_PARAGRAPH)));
+
+  private static final Gson gson = new Gson();
+  public static final Message EMPTY = new Message(null);
+  
+  public OP op;
+  public Map<String, Object> data = new HashMap<>();
+  public String ticket = "anonymous";
+  public String principal = "anonymous";
+  public String roles = "";
+
+  // Unique id generated from client side. to identify message.
+  // When message from server is response to the client request
+  // includes the msgId in response message, client can pair request and response message.
+  // When server send message that is not response to the client request, set null;
+  public String msgId = MSG_ID_NOT_DEFINED;
+  public static String MSG_ID_NOT_DEFINED = null;
+
+  public Message(OP op) {
+    this.op = op;
+  }
+
+  public Message withMsgId(String msgId) {
+    this.msgId = msgId;
+    return this;
+  }
+
+  public Message put(String k, Object v) {
+    data.put(k, v);
+    return this;
+  }
+
+  public Object get(String k) {
+    return data.get(k);
+  }
+
+  public static boolean isDisabledForRunningNotes(OP eventType) {
+    return disabledForRunningNoteMessages.contains(eventType);
+  }
+
+  public <T> T getType(String key) {
+    return (T) data.get(key);
+  }
+
+  public <T> T getType(String key, Logger LOG) {
+    try {
+      return getType(key);
+    } catch (ClassCastException e) {
+      LOG.error("Failed to get " + key + " from message (Invalid type). " , e);
+      return null;
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("Message{");
+    sb.append("data=").append(data);
+    sb.append(", op=").append(op);
+    sb.append('}');
+    return sb.toString();
+  }
+
+  public String toJson() {
+    return gson.toJson(this);
+  }
+
+  public static Message fromJson(String json) {
+    return gson.fromJson(json, Message.class);
+  }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/MessageHandler.java
similarity index 57%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/MessageHandler.java
index 8a6d0d0..5cb8f20 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/MessageHandler.java
@@ -14,32 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.rest.message;
 
-import com.google.gson.Gson;
 
-import org.apache.zeppelin.common.JsonSerializable;
+package org.apache.zeppelin.client.websocket;
 
 /**
- * RestartInterpreter rest api request message.
+ * Interface of how to handle websocket message sent from ZeppelinServer.
  */
-public class RestartInterpreterRequest implements JsonSerializable {
-  private static final Gson gson = new Gson();
+public interface MessageHandler {
 
-  String noteId;
+  void onMessage(String msg);
 
-  public RestartInterpreterRequest() {
-  }
-
-  public String getNoteId() {
-    return noteId;
-  }
-
-  public String toJson() {
-    return gson.toJson(this);
-  }
-
-  public static RestartInterpreterRequest fromJson(String json) {
-    return gson.fromJson(json, RestartInterpreterRequest.class);
-  }
 }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/SimpleMessageHandler.java
similarity index 55%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/SimpleMessageHandler.java
index 8a6d0d0..ca7b881 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/SimpleMessageHandler.java
@@ -14,32 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.rest.message;
 
-import com.google.gson.Gson;
+package org.apache.zeppelin.client.websocket;
 
-import org.apache.zeppelin.common.JsonSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * RestartInterpreter rest api request message.
+ * Simple implementation of AbstractMessageHandler which only print output.
  */
-public class RestartInterpreterRequest implements JsonSerializable {
-  private static final Gson gson = new Gson();
+public class SimpleMessageHandler extends AbstractMessageHandler {
 
-  String noteId;
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleMessageHandler.class);
 
-  public RestartInterpreterRequest() {
+  @Override
+  public void onStatementAppendOutput(String statementId, int index, String output) {
+    LOGGER.info("Append output, data: {}", output);
   }
 
-  public String getNoteId() {
-    return noteId;
-  }
-
-  public String toJson() {
-    return gson.toJson(this);
-  }
-
-  public static RestartInterpreterRequest fromJson(String json) {
-    return gson.fromJson(json, RestartInterpreterRequest.class);
+  @Override
+  public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
+    LOGGER.info("Update output, type: {}, data: {}", type, output);
   }
 }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/StatementMessageHandler.java
similarity index 57%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/StatementMessageHandler.java
index 8a6d0d0..2ee9e6b 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/StatementMessageHandler.java
@@ -14,32 +14,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.rest.message;
 
-import com.google.gson.Gson;
-
-import org.apache.zeppelin.common.JsonSerializable;
+package org.apache.zeppelin.client.websocket;
 
 /**
- * RestartInterpreter rest api request message.
+ * MessageHandler for each statement.
  */
-public class RestartInterpreterRequest implements JsonSerializable {
-  private static final Gson gson = new Gson();
-
-  String noteId;
-
-  public RestartInterpreterRequest() {
-  }
-
-  public String getNoteId() {
-    return noteId;
-  }
-
-  public String toJson() {
-    return gson.toJson(this);
-  }
+public interface StatementMessageHandler {
+
+  /**
+   * Invoked when there's new statement output appended.
+   *
+   * @param statementId
+   * @param index
+   * @param output
+   */
+  void onStatementAppendOutput(String statementId, int index, String output);
+
+  /**
+   * Invoked when statement's output is updated.
+   *
+   * @param statementId
+   * @param index
+   * @param type
+   * @param output
+   */
+  void onStatementUpdateOutput(String statementId, int index, String type, String output);
 
-  public static RestartInterpreterRequest fromJson(String json) {
-    return gson.fromJson(json, RestartInterpreterRequest.class);
-  }
 }
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/ZeppelinWebSocketClient.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/ZeppelinWebSocketClient.java
new file mode 100644
index 0000000..4a72c0a
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/ZeppelinWebSocketClient.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.websocket;
+
+import com.google.gson.Gson;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Represent websocket client.
+ *
+ */
+@WebSocket(maxTextMessageSize = 10 * 1000 * 1024 )
+public class ZeppelinWebSocketClient {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinWebSocketClient.class);
+  private static final Gson GSON = new Gson();
+
+  private CountDownLatch connectLatch = new CountDownLatch(1);
+  private CountDownLatch closeLatch = new CountDownLatch(1);
+
+  private Session session;
+  private MessageHandler messageHandler;
+  private WebSocketClient wsClient;
+
+  public ZeppelinWebSocketClient(MessageHandler messageHandler) {
+    this.messageHandler = messageHandler;
+  }
+
+  public void connect(String url) throws Exception {
+    this.wsClient = new WebSocketClient();
+    wsClient.start();
+    URI echoUri = new URI(url);
+    ClientUpgradeRequest request = new ClientUpgradeRequest();
+    request.setHeader("Origin", "*");
+    wsClient.connect(this, echoUri, request);
+    connectLatch.await();
+    LOGGER.info("WebSocket connect established");
+  }
+
+  public void addStatementMessageHandler(String statementId,
+                                         StatementMessageHandler statementMessageHandler) throws Exception {
+    if (messageHandler instanceof CompositeMessageHandler) {
+      ((CompositeMessageHandler) messageHandler).addStatementMessageHandler(statementId, statementMessageHandler);
+    } else {
+      throw new Exception("StatementMessageHandler is only supported by: "
+              + CompositeMessageHandler.class.getSimpleName());
+    }
+  }
+
+  public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
+    return this.closeLatch.await(duration, unit);
+  }
+
+  @OnWebSocketClose
+  public void onClose(int statusCode, String reason) {
+    LOGGER.info("Connection closed, statusCode: {} - reason: {}", statusCode, reason);
+    this.session = null;
+    this.closeLatch.countDown();
+  }
+
+  @OnWebSocketConnect
+  public void onConnect(Session session) {
+    LOGGER.info("Got connect: {}", session.getRemote());
+    this.session = session;
+    connectLatch.countDown();
+  }
+
+  @OnWebSocketMessage
+  public void onText(Session session, String message) throws IOException {
+    messageHandler.onMessage(message);
+  }
+
+  @OnWebSocketError
+  public void onError(Throwable cause) {
+    LOGGER.info("WebSocket Error: " + cause.getMessage());
+    cause.printStackTrace(System.out);
+  }
+
+  public void send(Message message) throws IOException {
+    session.getRemote().sendString(GSON.toJson(message));
+  }
+
+  public CountDownLatch getConnectLatch() {
+    return connectLatch;
+  }
+
+  public void stop() throws Exception {
+    if (this.wsClient != null) {
+      this.wsClient.stop();
+    }
+  }
+
+}
diff --git a/zeppelin-integration/src/test/resources/log4j.properties b/zeppelin-integration/src/test/resources/log4j.properties
index 8368993..c666d57 100644
--- a/zeppelin-integration/src/test/resources/log4j.properties
+++ b/zeppelin-integration/src/test/resources/log4j.properties
@@ -41,6 +41,3 @@ log4j.logger.DataNucleus.Datastore=ERROR
 
 # Log all JDBC parameters
 log4j.logger.org.hibernate.type=ALL
-
-log4j.logger.org.apache.zeppelin.interpreter=DEBUG
-log4j.logger.org.apache.zeppelin.spark=DEBUG
diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml
index 71ab23f..5958858 100644
--- a/zeppelin-interpreter-integration/pom.xml
+++ b/zeppelin-interpreter-integration/pom.xml
@@ -45,6 +45,18 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-client</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpcore-nio</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-zengine</artifactId>
       <version>${project.version}</version>
       <exclusions>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
new file mode 100644
index 0000000..ac9c5d7
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.Status;
+import org.apache.zeppelin.client.ZSession;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.integration.DownloadUtils;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.utils.TestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ZSessionIntegrationTest extends AbstractTestRestApi {
+
+  private static Notebook notebook;
+  private static String sparkHome;
+  private static String flinkHome;
+
+  private ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(),
+            "helium");
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ALLOWED_ORIGINS.getVarName(), "*");
+
+    AbstractTestRestApi.startUp(ZSessionIntegrationTest.class.getSimpleName());
+    notebook = TestUtils.getInstance(Notebook.class);
+    sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
+    flinkHome = DownloadUtils.downloadFlink("1.10.1");
+  }
+
+  @AfterClass
+  public static void destroy() throws Exception {
+    AbstractTestRestApi.shutDown();
+  }
+
+  @Test
+  public void testZSession_Shell() throws Exception {
+    ZSession session = ZSession.builder()
+            .setClientConfig(clientConfig)
+            .setInterpreter("sh")
+            .build();
+
+    try {
+      session.start();
+      assertNull(session.getWeburl());
+      assertNotNull(session.getNoteId());
+
+      Note note = notebook.getNote(session.getNoteId());
+      assertEquals(2, note.getParagraphCount());
+      assertTrue(note.getParagraph(0).getText(), note.getParagraph(0).getText().startsWith("%sh.conf"));
+
+      ExecuteResult result = session.execute("pwd");
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+
+      result = session.execute("invalid_command");
+      assertEquals(Status.ERROR, result.getStatus());
+      assertEquals(2, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("command not found"));
+      assertEquals("TEXT", result.getResults().get(1).getType());
+      assertTrue(result.getResults().get(1).getData(), result.getResults().get(1).getData().contains("ExitValue"));
+
+      assertEquals(4, note.getParagraphCount());
+      assertEquals("%sh\ninvalid_command", note.getParagraph(3).getText());
+
+    } finally {
+      session.stop();
+    }
+  }
+
+  @Test
+  public void testZSession_Shell_Submit() throws Exception {
+    ZSession session = ZSession.builder()
+            .setClientConfig(clientConfig)
+            .setInterpreter("sh")
+            .build();
+
+    try {
+      session.start();
+      assertNull(session.getWeburl());
+      assertNotNull(session.getNoteId());
+
+      Note note = notebook.getNote(session.getNoteId());
+      assertEquals(2, note.getParagraphCount());
+      assertTrue(note.getParagraph(0).getText(), note.getParagraph(0).getText().startsWith("%sh.conf"));
+
+      ExecuteResult result = session.submit("sleep 10\npwd");
+      assertFalse("Status is: " + result.getStatus().toString(), result.getStatus().isCompleted());
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+
+      result = session.submit("invalid_command");
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(Status.ERROR, result.getStatus());
+      assertEquals(2, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("command not found"));
+      assertEquals("TEXT", result.getResults().get(1).getType());
+      assertTrue(result.getResults().get(1).getData(), result.getResults().get(1).getData().contains("ExitValue"));
+
+      assertEquals(4, note.getParagraphCount());
+      assertEquals("%sh\ninvalid_command", note.getParagraph(3).getText());
+
+    } finally {
+      session.stop();
+    }
+  }
+
+  @Test
+  public void testZSession_Spark() throws Exception {
+    Map<String, String> intpProperties = new HashMap<>();
+    intpProperties.put("SPARK_HOME", sparkHome);
+    intpProperties.put("spark.master", "local[*]");
+
+    ZSession session = ZSession.builder()
+            .setClientConfig(clientConfig)
+            .setInterpreter("spark")
+            .setIntpProperties(intpProperties)
+            .build();
+
+    try {
+      session.start();
+      assertNotNull(session.getWeburl());
+      assertNotNull(session.getNoteId());
+
+      // scala
+      ExecuteResult result = session.execute("sc.version");
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("2.4.4"));
+      assertEquals(0, result.getJobUrls().size());
+
+      // pyspark
+      result = session.execute("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.registerTempTable('df')\ndf.show()");
+      assertEquals(Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertEquals(
+              "+---+---+\n" +
+              "| _1| _2|\n" +
+              "+---+---+\n" +
+              "|  1|  a|\n" +
+              "|  2|  b|\n" +
+              "+---+---+", result.getResults().get(0).getData().trim());
+      assertTrue(result.getJobUrls().size() > 0);
+
+      // sparkr
+      result = session.execute("r", "df <- as.DataFrame(faithful)\nhead(df)");
+      assertEquals(Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("eruptions waiting"));
+      assertEquals(2, result.getJobUrls().size());
+
+      // spark sql
+      result = session.execute("sql", "select * from df");
+      assertEquals(Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TABLE", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("1\ta\n2\tb\n"));
+      assertTrue(result.getJobUrls().size() > 0);
+
+      // spark invalid sql
+      result = session.execute("sql", "select * from unknown_table");
+      assertEquals(Status.ERROR, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("NoSuchTableException"));
+      assertEquals(0, result.getJobUrls().size());
+
+    } finally {
+      session.stop();
+    }
+  }
+
+  @Test
+  public void testZSession_Spark_Submit() throws Exception {
+    Map<String, String> intpProperties = new HashMap<>();
+    intpProperties.put("SPARK_HOME", sparkHome);
+    intpProperties.put("spark.master", "local[*]");
+
+    ZSession session = ZSession.builder()
+            .setClientConfig(clientConfig)
+            .setInterpreter("spark")
+            .setIntpProperties(intpProperties)
+            .build();
+
+    try {
+      session.start();
+      assertNotNull(session.getWeburl());
+      assertNotNull(session.getNoteId());
+
+      // scala
+      ExecuteResult result = session.submit("sc.version");
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("2.4.4"));
+      assertEquals(0, result.getJobUrls().size());
+
+      // pyspark
+      result = session.submit("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.registerTempTable('df')\ndf.show()");
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertEquals(
+              "+---+---+\n" +
+              "| _1| _2|\n" +
+              "+---+---+\n" +
+              "|  1|  a|\n" +
+              "|  2|  b|\n" +
+              "+---+---+", result.getResults().get(0).getData().trim());
+      assertTrue(result.getJobUrls().size() > 0);
+
+      // sparkr
+      result = session.submit("r", "df <- as.DataFrame(faithful)\nhead(df)");
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("eruptions waiting"));
+      assertEquals(2, result.getJobUrls().size());
+
+      // spark sql
+      result = session.submit("sql", "select * from df");
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TABLE", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("1\ta\n2\tb\n"));
+      assertTrue(result.getJobUrls().size() > 0);
+
+      // spark invalid sql
+      result = session.submit("sql", "select * from unknown_table");
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(Status.ERROR, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("NoSuchTableException"));
+      assertEquals(0, result.getJobUrls().size());
+
+      // cancel
+      result = session.submit("sc.range(1,100).map(e=>{Thread.sleep(1000);e}).collect()");
+      assertFalse("Status is: " + result.getStatus().toString(), result.getStatus().isCompleted());
+      result = session.waitUntilRunning(result.getStatementId());
+      session.cancel(result.getStatementId());
+      assertEquals(result.toString(), Status.RUNNING, result.getStatus());
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(result.toString(), Status.ABORT, result.getStatus());
+
+    } finally {
+      session.stop();
+    }
+  }
+
+  @Test
+  public void testZSession_Flink() throws Exception {
+    Map<String, String> intpProperties = new HashMap<>();
+    intpProperties.put("FLINK_HOME", flinkHome);
+
+    ZSession session = ZSession.builder()
+            .setClientConfig(clientConfig)
+            .setInterpreter("flink")
+            .setIntpProperties(intpProperties)
+            .build();
+
+    try {
+      session.start();
+      assertNotNull(session.getWeburl());
+      assertNotNull(session.getNoteId());
+
+      // scala
+      ExecuteResult result = session.execute("val data = benv.fromElements(1, 2, 3)\ndata.collect()");
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("1, 2, 3"));
+
+      // sql
+      result = session.execute(getInitStreamScript(200));
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      Map<String, String> localProperties = new HashMap<>();
+      localProperties.put("type", "update");
+      result = session.execute("ssql", localProperties, "select url, count(1) as pv from log group by url");
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+
+    } finally {
+      session.stop();
+    }
+  }
+
+  @Test
+  public void testZSession_Flink_Submit() throws Exception {
+    Map<String, String> intpProperties = new HashMap<>();
+    intpProperties.put("FLINK_HOME", flinkHome);
+
+    ZSession session = ZSession.builder()
+            .setClientConfig(clientConfig)
+            .setInterpreter("flink")
+            .setIntpProperties(intpProperties)
+            .build();
+
+    try {
+      session.start(new SimpleMessageHandler());
+      assertNotNull(session.getWeburl());
+      assertNotNull(session.getNoteId());
+
+      // scala
+      ExecuteResult result = session.submit("val data = benv.fromElements(1, 2, 3)\ndata.collect()");
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("1, 2, 3"));
+
+      // sql
+      result = session.submit(getInitStreamScript(200));
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      Map<String, String> localProperties = new HashMap<>();
+      localProperties.put("type", "update");
+      result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url");
+      assertFalse("Status is: " + result.getStatus().toString(), result.getStatus().isCompleted());
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+
+      // cancel
+      result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url");
+      assertFalse("Status is: " + result.getStatus().toString(), result.getStatus().isCompleted());
+      result = session.waitUntilRunning(result.getStatementId());
+      session.cancel(result.getStatementId());
+      assertEquals(result.toString(), Status.RUNNING, result.getStatus());
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(result.toString(), Status.ABORT, result.getStatus());
+    } finally {
+      session.stop();
+    }
+  }
+
+  @Test
+  public void testZSession_Python() throws Exception {
+    Map<String, String> intpProperties = new HashMap<>();
+
+    ZSession session = ZSession.builder()
+            .setClientConfig(clientConfig)
+            .setInterpreter("python")
+            .setIntpProperties(intpProperties)
+            .build();
+
+    try {
+      session.start(new SimpleMessageHandler());
+      assertNull(session.getWeburl());
+      assertNotNull(session.getNoteId());
+
+      // python
+//      ExecuteResult result = session.execute("1+1");
+//      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+//      assertEquals(1, result.getResults().size());
+//      assertEquals("TEXT", result.getResults().get(0).getType());
+//      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("2"));
+//
+//      // python
+//      result = session.execute("1/0");
+//      assertEquals(result.toString(), Status.ERROR, result.getStatus());
+//      assertEquals(1, result.getResults().size());
+//      assertEquals("TEXT", result.getResults().get(0).getType());
+//      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("ZeroDivisionError"));
+
+      // for loop
+      ExecuteResult result = session.execute("import time\n" +
+                               "for i in range(1,10):\n" +
+                               "\tprint(i)\n" +
+                               "\ttime.sleep(1)");
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TEXT", result.getResults().get(0).getType());
+    } finally {
+      session.stop();
+    }
+  }
+
+  //@Test
+  public void testZSession_Jdbc() throws Exception {
+
+    Map<String, String> intpProperties = new HashMap<>();
+    intpProperties.put("default.driver", "com.mysql.jdbc.Driver");
+    intpProperties.put("default.url", "jdbc:mysql://localhost:3306/");
+    intpProperties.put("default.user", "root");
+
+    ZSession session = ZSession.builder()
+            .setClientConfig(clientConfig)
+            .setInterpreter("jdbc")
+            .setIntpProperties(intpProperties)
+            .build();
+
+    try {
+      session.start();
+      assertEquals("", session.getWeburl());
+      assertNotNull(session.getNoteId());
+
+      // show databases
+      ExecuteResult result = session.execute("show databases");
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TABLE", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Database"));
+
+      // select statement
+      result = session.execute("SELECT 1 as c1, 2 as c2");
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TABLE", result.getResults().get(0).getType());
+      assertEquals("c1\tc2\n1\t2\n", result.getResults().get(0).getData());
+
+    } finally {
+      session.stop();
+    }
+  }
+
+  //@Test
+  public void testZSession_Jdbc_Submit() throws Exception {
+
+    Map<String, String> intpProperties = new HashMap<>();
+    intpProperties.put("default.driver", "com.mysql.jdbc.Driver");
+    intpProperties.put("default.url", "jdbc:mysql://localhost:3306/");
+    intpProperties.put("default.user", "root");
+
+    ZSession session = ZSession.builder()
+            .setClientConfig(clientConfig)
+            .setInterpreter("jdbc")
+            .setIntpProperties(intpProperties)
+            .build();
+
+    try {
+      session.start();
+      assertEquals("", session.getWeburl());
+      assertNotNull(session.getNoteId());
+
+      // show databases
+      ExecuteResult result = session.submit("show databases");
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TABLE", result.getResults().get(0).getType());
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Database"));
+
+      // select statement
+      result = session.submit("SELECT 1 as c1, 2 as c2");
+      result = session.waitUntilFinished(result.getStatementId());
+      assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+      assertEquals(1, result.getResults().size());
+      assertEquals("TABLE", result.getResults().get(0).getType());
+      assertEquals("c1\tc2\n1\t2\n", result.getResults().get(0).getData());
+
+    } finally {
+      session.stop();
+    }
+  }
+
+  public static String getInitStreamScript(int sleep_interval) throws IOException {
+    return IOUtils.toString(ZSessionIntegrationTest.class.getResource("/init_stream.scala"))
+            .replace("{{sleep_interval}}", sleep_interval + "");
+  }
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientIntegrationTest.java
new file mode 100644
index 0000000..820ebff
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientIntegrationTest.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.NoteResult;
+import org.apache.zeppelin.client.ParagraphResult;
+import org.apache.zeppelin.client.Status;
+import org.apache.zeppelin.client.ZeppelinClient;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.utils.TestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ZeppelinClientIntegrationTest extends AbstractTestRestApi {
+  private static Notebook notebook;
+
+  private static ClientConfig clientConfig;
+  private static ZeppelinClient zeppelinClient;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(),
+            "helium");
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ALLOWED_ORIGINS.getVarName(), "*");
+
+    AbstractTestRestApi.startUp(ZeppelinClientIntegrationTest.class.getSimpleName());
+    notebook = TestUtils.getInstance(Notebook.class);
+
+    clientConfig = new ClientConfig("http://localhost:8080");
+    zeppelinClient = new ZeppelinClient(clientConfig);
+  }
+
+  @AfterClass
+  public static void destroy() throws Exception {
+    AbstractTestRestApi.shutDown();
+  }
+
+  @Test
+  public void testZeppelinVersion() throws Exception {
+    String version = zeppelinClient.getVersion();
+    LOG.info("Zeppelin version: " + version);
+  }
+
+  @Test
+  public void testNoteOperation() throws Exception {
+    String noteId = zeppelinClient.createNote("/project_1/note1");
+    assertNotNull(notebook.getNote(noteId));
+
+    // create duplicated note
+    try {
+      zeppelinClient.createNote("/project_1/note1");
+      fail("Should fail to create duplicated note");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("existed"));
+    }
+
+    // query NoteResult
+    NoteResult noteResult = zeppelinClient.queryNoteResult(noteId);
+    assertEquals(noteId, noteResult.getNoteId());
+    assertEquals(false, noteResult.isRunning());
+    // note is created with 0 paragraph.
+    assertEquals(0, noteResult.getParagraphResultList().size());
+
+    // query non-existed note
+    try {
+      zeppelinClient.queryNoteResult("unknown-noteId");
+      fail("Should fail to query non-existed note");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("No such note"));
+    }
+
+    zeppelinClient.deleteNote(noteId);
+
+    // deleting the same note again will fail
+    try {
+      zeppelinClient.deleteNote(noteId);
+      fail("Should fail to delete non-existed note");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("No such note"));
+    }
+  }
+
+  @Test
+  public void testExecuteParagraph() throws Exception {
+    // run paragraph succeed
+    String noteId = zeppelinClient.createNote("/test/note_1");
+    String paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello world'");
+    ParagraphResult paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId);
+    assertEquals(paragraphId, paragraphResult.getParagraphId());
+    assertEquals(Status.FINISHED, paragraphResult.getStatus());
+    assertEquals(1, paragraphResult.getResults().size());
+    assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+    assertEquals("hello world\n", paragraphResult.getResults().get(0).getData());
+
+    // run paragraph succeed with dynamic forms
+    paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello ${name=abc}'");
+    paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId);
+    assertEquals(paragraphId, paragraphResult.getParagraphId());
+    assertEquals(paragraphResult.toString(), Status.FINISHED, paragraphResult.getStatus());
+    assertEquals(1, paragraphResult.getResults().size());
+    assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+    assertEquals("hello abc\n", paragraphResult.getResults().get(0).getData());
+
+    // run paragraph succeed with parameters
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("name", "zeppelin");
+    paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId, parameters);
+    assertEquals(paragraphId, paragraphResult.getParagraphId());
+    assertEquals(Status.FINISHED, paragraphResult.getStatus());
+    assertEquals(1, paragraphResult.getResults().size());
+    assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+    assertEquals("hello zeppelin\n", paragraphResult.getResults().get(0).getData());
+
+    // run paragraph failed
+    paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh invalid_command");
+    paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId);
+    assertEquals(paragraphId, paragraphResult.getParagraphId());
+    assertEquals(Status.ERROR, paragraphResult.getStatus());
+    assertEquals(2, paragraphResult.getResults().size());
+    assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+    assertTrue(paragraphResult.getResults().get(0).getData(), paragraphResult.getResults().get(0).getData().contains("command not found"));
+    assertEquals("TEXT", paragraphResult.getResults().get(1).getType());
+    assertTrue(paragraphResult.getResults().get(1).getData(), paragraphResult.getResults().get(1).getData().contains("ExitValue"));
+
+    // run non-existed interpreter
+    paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%non_existed hello");
+    paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId);
+    assertEquals(paragraphId, paragraphResult.getParagraphId());
+    assertEquals(Status.ERROR, paragraphResult.getStatus());
+    assertEquals(1, paragraphResult.getResults().size());
+    assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+    assertTrue(paragraphResult.getResults().get(0).getData(), paragraphResult.getResults().get(0).getData().contains("Interpreter non_existed not found"));
+
+    // run non-existed paragraph
+    try {
+      zeppelinClient.executeParagraph(noteId, "invalid_paragraph_id");
+      fail("Should fail to run non-existed paragraph");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("No such paragraph"));
+    }
+  }
+
+  @Test
+  public void testSubmitParagraph() throws Exception {
+    String noteId = zeppelinClient.createNote("/test/note_2");
+    String paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello world'");
+    zeppelinClient.submitParagraph(noteId, paragraphId);
+    ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, paragraphId, 10 * 1000);
+    assertEquals(paragraphId, paragraphResult.getParagraphId());
+    assertEquals(Status.FINISHED, paragraphResult.getStatus());
+    assertEquals(1, paragraphResult.getResults().size());
+    assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+    assertEquals("hello world\n", paragraphResult.getResults().get(0).getData());
+
+    // submit paragraph succeed with dynamic forms
+    paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello ${name=abc}'");
+    zeppelinClient.submitParagraph(noteId, paragraphId);
+    paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, paragraphId, 10 * 1000);
+    assertEquals(paragraphId, paragraphResult.getParagraphId());
+    assertEquals(paragraphResult.toString(), Status.FINISHED, paragraphResult.getStatus());
+    assertEquals(1, paragraphResult.getResults().size());
+    assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+    assertEquals("hello abc\n", paragraphResult.getResults().get(0).getData());
+
+    // run paragraph succeed with parameters
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("name", "zeppelin");
+    zeppelinClient.submitParagraph(noteId, paragraphId, parameters);
+    paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, paragraphId, 10 * 1000);
+    assertEquals(paragraphId, paragraphResult.getParagraphId());
+    assertEquals(Status.FINISHED, paragraphResult.getStatus());
+    assertEquals(1, paragraphResult.getResults().size());
+    assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+    assertEquals("hello zeppelin\n", paragraphResult.getResults().get(0).getData());
+
+    // run paragraph failed
+    paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh invalid_command");
+    zeppelinClient.submitParagraph(noteId, paragraphId);
+    paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, paragraphId, 10 * 1000);
+
+    assertEquals(paragraphId, paragraphResult.getParagraphId());
+    assertEquals(Status.ERROR, paragraphResult.getStatus());
+    assertEquals(2, paragraphResult.getResults().size());
+    assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+    assertTrue(paragraphResult.getResults().get(0).getData(), paragraphResult.getResults().get(0).getData().contains("command not found"));
+    assertEquals("TEXT", paragraphResult.getResults().get(1).getType());
+    assertTrue(paragraphResult.getResults().get(1).getData(), paragraphResult.getResults().get(1).getData().contains("ExitValue"));
+
+    // run non-existed interpreter
+    paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%non_existed hello");
+    zeppelinClient.submitParagraph(noteId, paragraphId);
+    paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, paragraphId, 10 * 1000);
+    assertEquals(paragraphId, paragraphResult.getParagraphId());
+    assertEquals(Status.ERROR, paragraphResult.getStatus());
+    assertEquals(1, paragraphResult.getResults().size());
+    assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+    assertTrue(paragraphResult.getResults().get(0).getData(), paragraphResult.getResults().get(0).getData().contains("Interpreter non_existed not found"));
+
+    // run non-existed paragraph
+    try {
+      zeppelinClient.submitParagraph(noteId, "invalid_paragraph_id");
+      fail("Should fail to run non-existed paragraph");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("No such paragraph"));
+    }
+  }
+
+  @Test
+  public void testExecuteNote() throws Exception {
+    try {
+      zeppelinClient.executeNote("unknown_id");
+      fail("Should fail to submit non-existed note");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("No such note"));
+    }
+    String noteId = zeppelinClient.createNote("/test/note_3");
+    String p0Id = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello world'");
+    NoteResult noteResult = zeppelinClient.executeNote(noteId, new HashMap<>());
+    assertEquals(noteId, noteResult.getNoteId());
+    assertEquals(false, noteResult.isRunning());
+    assertEquals(1, noteResult.getParagraphResultList().size());
+    ParagraphResult p0 = noteResult.getParagraphResultList().get(0);
+    assertEquals(Status.FINISHED, p0.getStatus());
+    assertEquals(1, p0.getResults().size());
+    assertEquals("TEXT", p0.getResults().get(0).getType());
+    assertEquals("hello world\n", p0.getResults().get(0).getData());
+
+    // update paragraph with dynamic forms
+    zeppelinClient.updateParagraph(noteId, p0Id, "run sh", "%sh echo 'hello ${name=abc}'");
+    noteResult = zeppelinClient.executeNote(noteId);
+    assertEquals(noteId, noteResult.getNoteId());
+    assertEquals(false, noteResult.isRunning());
+    assertEquals(1, noteResult.getParagraphResultList().size());
+    p0 = noteResult.getParagraphResultList().get(0);
+    assertEquals(Status.FINISHED, p0.getStatus());
+    assertEquals(1, p0.getResults().size());
+    assertEquals("TEXT", p0.getResults().get(0).getType());
+    assertEquals("hello abc\n", p0.getResults().get(0).getData());
+
+    // execute paragraph with parameters
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("name", "zeppelin");
+    noteResult = zeppelinClient.executeNote(noteId, parameters);
+    assertEquals(noteId, noteResult.getNoteId());
+    assertEquals(false, noteResult.isRunning());
+    assertEquals(1, noteResult.getParagraphResultList().size());
+    p0 = noteResult.getParagraphResultList().get(0);
+    assertEquals(Status.FINISHED, p0.getStatus());
+    assertEquals(1, p0.getResults().size());
+    assertEquals("TEXT", p0.getResults().get(0).getType());
+    assertEquals("hello zeppelin\n", p0.getResults().get(0).getData());
+
+    zeppelinClient.addParagraph(noteId, "run sh", "%sh invalid_command");
+    zeppelinClient.addParagraph(noteId, "run sh", "%sh pwd");
+    noteResult = zeppelinClient.executeNote(noteId, parameters);
+    assertEquals(noteId, noteResult.getNoteId());
+    assertEquals(false, noteResult.isRunning());
+    assertEquals(3, noteResult.getParagraphResultList().size());
+    p0 = noteResult.getParagraphResultList().get(0);
+    assertEquals(Status.FINISHED, p0.getStatus());
+    assertEquals(1, p0.getResults().size());
+    assertEquals("TEXT", p0.getResults().get(0).getType());
+    assertEquals("hello zeppelin\n", p0.getResults().get(0).getData());
+
+    ParagraphResult p1 = noteResult.getParagraphResultList().get(1);
+    assertEquals(Status.ERROR, p1.getStatus());
+    assertEquals("TEXT", p1.getResults().get(0).getType());
+    assertTrue(p1.getResults().get(0).getData(), p1.getResults().get(0).getData().contains("command not found"));
+    assertEquals("TEXT", p1.getResults().get(1).getType());
+    assertTrue(p1.getResults().get(1).getData(), p1.getResults().get(1).getData().contains("ExitValue"));
+
+    // p2 will be skipped because p1 fails.
+    ParagraphResult p2 = noteResult.getParagraphResultList().get(2);
+    assertEquals(Status.READY, p2.getStatus());
+    assertEquals(0, p2.getResults().size());
+  }
+
+  @Test
+  public void testSubmitNote() throws Exception {
+    try {
+      zeppelinClient.submitNote("unknown_id");
+      fail("Should fail to submit non-existed note");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("No such note"));
+    }
+    String noteId = zeppelinClient.createNote("/test/note_4");
+    String p0Id = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello world'");
+    zeppelinClient.submitNote(noteId);
+    NoteResult noteResult = zeppelinClient.waitUntilNoteFinished(noteId);
+    assertEquals(noteId, noteResult.getNoteId());
+    assertEquals(false, noteResult.isRunning());
+    assertEquals(1, noteResult.getParagraphResultList().size());
+    ParagraphResult p0 = noteResult.getParagraphResultList().get(0);
+    assertEquals(Status.FINISHED, p0.getStatus());
+    assertEquals(1, p0.getResults().size());
+    assertEquals("TEXT", p0.getResults().get(0).getType());
+    assertEquals("hello world\n", p0.getResults().get(0).getData());
+
+    // update paragraph with dynamic forms
+    zeppelinClient.updateParagraph(noteId, p0Id, "run sh", "%sh sleep 5\necho 'hello ${name=abc}'");
+    noteResult = zeppelinClient.submitNote(noteId);
+    assertEquals(true, noteResult.isRunning());
+    noteResult = zeppelinClient.waitUntilNoteFinished(noteId);
+    assertEquals(noteId, noteResult.getNoteId());
+    assertEquals(false, noteResult.isRunning());
+    assertEquals(1, noteResult.getParagraphResultList().size());
+    p0 = noteResult.getParagraphResultList().get(0);
+    assertEquals(Status.FINISHED, p0.getStatus());
+    assertEquals(1, p0.getResults().size());
+    assertEquals("TEXT", p0.getResults().get(0).getType());
+    assertEquals("hello abc\n", p0.getResults().get(0).getData());
+
+    // execute paragraph with parameters
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("name", "zeppelin");
+    noteResult = zeppelinClient.executeNote(noteId, parameters);
+    assertEquals(noteId, noteResult.getNoteId());
+    assertEquals(false, noteResult.isRunning());
+    assertEquals(1, noteResult.getParagraphResultList().size());
+    p0 = noteResult.getParagraphResultList().get(0);
+    assertEquals(Status.FINISHED, p0.getStatus());
+    assertEquals(1, p0.getResults().size());
+    assertEquals("TEXT", p0.getResults().get(0).getType());
+    assertEquals("hello zeppelin\n", p0.getResults().get(0).getData());
+
+    zeppelinClient.addParagraph(noteId, "run sh", "%sh invalid_command");
+    zeppelinClient.addParagraph(noteId, "run sh", "%sh pwd");
+    zeppelinClient.submitNote(noteId, parameters);
+    noteResult = zeppelinClient.waitUntilNoteFinished(noteId);
+    assertEquals(noteId, noteResult.getNoteId());
+    assertEquals(false, noteResult.isRunning());
+    assertEquals(3, noteResult.getParagraphResultList().size());
+    p0 = noteResult.getParagraphResultList().get(0);
+    assertEquals(Status.FINISHED, p0.getStatus());
+    assertEquals(1, p0.getResults().size());
+    assertEquals("TEXT", p0.getResults().get(0).getType());
+    assertEquals("hello zeppelin\n", p0.getResults().get(0).getData());
+
+    ParagraphResult p1 = noteResult.getParagraphResultList().get(1);
+    assertEquals(Status.ERROR, p1.getStatus());
+    assertEquals("TEXT", p1.getResults().get(0).getType());
+    assertTrue(p1.getResults().get(0).getData(), p1.getResults().get(0).getData().contains("command not found"));
+    assertEquals("TEXT", p1.getResults().get(1).getType());
+    assertTrue(p1.getResults().get(1).getData(), p1.getResults().get(1).getData().contains("ExitValue"));
+
+    // p2 will be skipped because p1 fails.
+    ParagraphResult p2 = noteResult.getParagraphResultList().get(2);
+    assertEquals(Status.READY, p2.getStatus());
+    assertEquals(0, p2.getResults().size());
+  }
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientWithAuthIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientWithAuthIntegrationTest.java
new file mode 100644
index 0000000..9b0fc93
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientWithAuthIntegrationTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.NoteResult;
+import org.apache.zeppelin.client.ParagraphResult;
+import org.apache.zeppelin.client.Status;
+import org.apache.zeppelin.client.ZeppelinClient;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.utils.TestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ZeppelinClientWithAuthIntegrationTest extends AbstractTestRestApi {
+  private static Notebook notebook;
+
+  private static ClientConfig clientConfig;
+  private static ZeppelinClient zeppelinClient;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(),
+            "helium");
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ALLOWED_ORIGINS.getVarName(), "*");
+
+    AbstractTestRestApi.startUpWithAuthenticationEnable(ZeppelinClientWithAuthIntegrationTest.class.getSimpleName());
+    notebook = TestUtils.getInstance(Notebook.class);
+
+    clientConfig = new ClientConfig("http://localhost:8080");
+    zeppelinClient = new ZeppelinClient(clientConfig);
+  }
+
+  @AfterClass
+  public static void destroy() throws Exception {
+    AbstractTestRestApi.shutDown();
+  }
+
+  @Test
+  public void testZeppelinVersion() throws Exception {
+    String version = zeppelinClient.getVersion();
+    LOG.info("Zeppelin version: " + version);
+  }
+
+  @Test
+  public void testCreateNoteWithoutLogin() throws Exception {
+    try {
+      zeppelinClient.createNote("/note_1");
+      fail("Should fail due to not login");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("login first"));
+    }
+  }
+
+  @Test
+  public void testCreateNoteAfterLogin() throws Exception {
+    zeppelinClient.login("admin", "password1");
+    zeppelinClient.createNote("/note_2");
+  }
+
+  @Test
+  public void testLoginFailed() throws Exception {
+    // wrong password
+    try {
+      zeppelinClient.login("admin", "invalid_password");
+      fail("Should fail to login");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("Forbidden"));
+    }
+
+    // wrong username
+    try {
+      zeppelinClient.login("invalid_user", "password1");
+      fail("Should fail to login");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("Forbidden"));
+    }
+  }
+}
+
diff --git a/zeppelin-interpreter-integration/src/test/resources/log4j.properties b/zeppelin-interpreter-integration/src/test/resources/log4j.properties
index 773d0af..403c078 100644
--- a/zeppelin-interpreter-integration/src/test/resources/log4j.properties
+++ b/zeppelin-interpreter-integration/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.Target=System.out
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
 #log4j.appender.stdout.layout.ConversionPattern=
 #%5p [%t] (%F:%L) - %m%n
 #%-4r [%t] %-5p %c %x - %m%n
@@ -42,3 +42,5 @@ log4j.logger.DataNucleus.Datastore=ERROR
 # Log all JDBC parameters
 log4j.logger.org.hibernate.type=ALL
 log4j.logger.org.apache.hadoop=WARN
+
+log4j.logger.org.apache.zeppelin.client=INFO
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java
index 9474560..4800619 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java
@@ -18,22 +18,24 @@
 package org.apache.zeppelin.interpreter;
 
 /**
- * Context info about running interpreter. This is used for deciding which interpreter binding
- * mode to use.
+ * Context info about running interpreter. This is used for deciding bind to
+ * which interpreter progress(InterpreterGroup)
  */
 public class ExecutionContext {
 
   private final String user;
   private final String noteId;
+  private final String interpreterGroupId;
   private final String defaultInterpreterGroup;
   private final boolean inIsolatedMode;
   // When is the execution triggered, e.g. when the cron job is triggered or when the rest api is triggered.
   private final String startTime;
 
-  public ExecutionContext(String user, String noteId, String defaultInterpreterGroup,
+  public ExecutionContext(String user, String noteId, String interpreterGroupId, String defaultInterpreterGroup,
                           boolean inIsolatedMode, String startTime) {
     this.user = user;
     this.noteId = noteId;
+    this.interpreterGroupId = interpreterGroupId;
     this.defaultInterpreterGroup = defaultInterpreterGroup;
     this.inIsolatedMode = inIsolatedMode;
     this.startTime = startTime;
@@ -47,6 +49,10 @@ public class ExecutionContext {
     return noteId;
   }
 
+  public String getInterpreterGroupId() {
+    return interpreterGroupId;
+  }
+
   public String getDefaultInterpreterGroup() {
     return defaultInterpreterGroup;
   }
@@ -64,6 +70,7 @@ public class ExecutionContext {
     return "ExecutionContext{" +
             "user='" + user + '\'' +
             ", noteId='" + noteId + '\'' +
+            ", interpreterGroupId='" + interpreterGroupId + '\'' +
             ", defaultInterpreterGroup='" + defaultInterpreterGroup + '\'' +
             ", inIsolatedMode=" + inIsolatedMode +
             ", startTime=" + startTime +
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java
index bf8be9c..86956d2 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java
@@ -23,6 +23,7 @@ package org.apache.zeppelin.interpreter;
 public class ExecutionContextBuilder {
   private String user;
   private String noteId;
+  private String interpreterGroupId;
   private String defaultInterpreterGroup = "";
   private boolean inIsolatedMode = false;
   private String startTime = "";
@@ -52,7 +53,13 @@ public class ExecutionContextBuilder {
     return this;
   }
 
+  public ExecutionContextBuilder setInterpreterGroupId(String interpreterGroupId) {
+    this.interpreterGroupId = interpreterGroupId;
+    return this;
+  }
+
   public ExecutionContext createExecutionContext() {
-    return new ExecutionContext(user, noteId, defaultInterpreterGroup, inIsolatedMode, startTime);
+    return new ExecutionContext(user, noteId, interpreterGroupId, defaultInterpreterGroup, inIsolatedMode, startTime);
   }
+
 }
\ No newline at end of file
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index 36ec1bd..54769be 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -69,7 +69,7 @@ import java.util.Set;
 @Singleton
 public class InterpreterRestApi {
 
-  private static final Logger logger = LoggerFactory.getLogger(InterpreterRestApi.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterRestApi.class);
 
   private final AuthenticationService authenticationService;
   private final AuthorizationService authorizationService;
@@ -116,7 +116,7 @@ public class InterpreterRestApi {
         return new JsonResponse<>(Status.OK, "", setting).build();
       }
     } catch (NullPointerException e) {
-      logger.error("Exception in InterpreterRestApi while creating ", e);
+      LOGGER.error("Exception in InterpreterRestApi while creating ", e);
       return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
           ExceptionUtils.getStackTrace(e)).build();
     }
@@ -141,10 +141,10 @@ public class InterpreterRestApi {
       InterpreterSetting interpreterSetting = interpreterSettingManager
           .createNewSetting(request.getName(), request.getGroup(), request.getDependencies(),
               request.getOption(), request.getProperties());
-      logger.info("new setting created with {}", interpreterSetting.getId());
+      LOGGER.info("new setting created with {}", interpreterSetting.getId());
       return new JsonResponse<>(Status.OK, "", interpreterSetting).build();
     } catch (IOException e) {
-      logger.error("Exception in InterpreterRestApi while creating ", e);
+      LOGGER.error("Exception in InterpreterRestApi while creating ", e);
       return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e))
           .build();
     }
@@ -154,7 +154,7 @@ public class InterpreterRestApi {
   @Path("setting/{settingId}")
   @ZeppelinApi
   public Response updateSetting(String message, @PathParam("settingId") String settingId) {
-    logger.info("Update interpreterSetting {}", settingId);
+    LOGGER.info("Update interpreterSetting {}", settingId);
 
     try {
       UpdateInterpreterSettingRequest request =
@@ -163,11 +163,11 @@ public class InterpreterRestApi {
           .setPropertyAndRestart(settingId, request.getOption(), request.getProperties(),
               request.getDependencies());
     } catch (InterpreterException e) {
-      logger.error("Exception in InterpreterRestApi while updateSetting ", e);
+      LOGGER.error("Exception in InterpreterRestApi while updateSetting ", e);
       return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e))
           .build();
     } catch (IOException e) {
-      logger.error("Exception in InterpreterRestApi while updateSetting ", e);
+      LOGGER.error("Exception in InterpreterRestApi while updateSetting ", e);
       return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
           ExceptionUtils.getStackTrace(e)).build();
     }
@@ -185,7 +185,7 @@ public class InterpreterRestApi {
   @Path("setting/{settingId}")
   @ZeppelinApi
   public Response removeSetting(@PathParam("settingId") String settingId) throws IOException {
-    logger.info("Remove interpreterSetting {}", settingId);
+    LOGGER.info("Remove interpreterSetting {}", settingId);
     interpreterSettingManager.remove(settingId);
     return new JsonResponse(Status.OK).build();
   }
@@ -197,7 +197,7 @@ public class InterpreterRestApi {
   @Path("setting/restart/{settingId}")
   @ZeppelinApi
   public Response restartSetting(String message, @PathParam("settingId") String settingId) {
-    logger.info("Restart interpreterSetting {}, msg={}", settingId, message);
+    LOGGER.info("Restart interpreterSetting {}, msg={}", settingId, message);
 
     InterpreterSetting setting = interpreterSettingManager.get(settingId);
     try {
@@ -224,7 +224,7 @@ public class InterpreterRestApi {
         }
       }
     } catch (InterpreterException e) {
-      logger.error("Exception in InterpreterRestApi while restartSetting ", e);
+      LOGGER.error("Exception in InterpreterRestApi while restartSetting ", e);
       return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e))
           .build();
     }
@@ -268,9 +268,9 @@ public class InterpreterRestApi {
       Repository request = Repository.fromJson(message);
       interpreterSettingManager.addRepository(request.getId(), request.getUrl(),
           request.isSnapshot(), request.getAuthentication(), request.getProxy());
-      logger.info("New repository {} added", request.getId());
+      LOGGER.info("New repository {} added", request.getId());
     } catch (Exception e) {
-      logger.error("Exception in InterpreterRestApi while adding repository ", e);
+      LOGGER.error("Exception in InterpreterRestApi while adding repository ", e);
       return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
           ExceptionUtils.getStackTrace(e)).build();
     }
@@ -286,11 +286,11 @@ public class InterpreterRestApi {
   @Path("repository/{repoId}")
   @ZeppelinApi
   public Response removeRepository(@PathParam("repoId") String repoId) {
-    logger.info("Remove repository {}", repoId);
+    LOGGER.info("Remove repository {}", repoId);
     try {
       interpreterSettingManager.removeRepository(repoId);
     } catch (Exception e) {
-      logger.error("Exception in InterpreterRestApi while removing repository ", e);
+      LOGGER.error("Exception in InterpreterRestApi while removing repository ", e);
       return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
           ExceptionUtils.getStackTrace(e)).build();
     }
@@ -311,7 +311,7 @@ public class InterpreterRestApi {
   @Path("install")
   @ZeppelinApi
   public Response installInterpreter(@NotNull String message) {
-    logger.info("Install interpreter: {}", message);
+    LOGGER.info("Install interpreter: {}", message);
     InterpreterInstallationRequest request = InterpreterInstallationRequest.fromJson(message);
 
     try {
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index 52f4acf..3009676 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -37,6 +37,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.StringUtils;
@@ -128,7 +129,7 @@ public class NotebookRestApi extends AbstractRestApi {
   public Response getNotePermissions(@PathParam("noteId") String noteId) throws IOException {
     checkIfUserIsAnon(getBlockNotAuthenticatedUserErrorMsg());
     checkIfUserCanRead(noteId,
-        "Insufficient privileges you cannot get the list of permissions for this note");
+            "Insufficient privileges you cannot get the list of permissions for this note");
     HashMap<String, Set<String>> permissionsMap = new HashMap<>();
     permissionsMap.put("owners", authorizationService.getOwners(noteId));
     permissionsMap.put("readers", authorizationService.getReaders(noteId));
@@ -241,6 +242,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response putNotePermissions(@PathParam("noteId") String noteId, String req)
       throws IOException {
+
     String principal = authenticationService.getPrincipal();
     Set<String> roles = authenticationService.getAssociatedRoles();
     HashSet<String> userAndRoles = new HashSet<>();
@@ -249,11 +251,11 @@ public class NotebookRestApi extends AbstractRestApi {
 
     checkIfUserIsAnon(getBlockNotAuthenticatedUserErrorMsg());
     checkIfUserIsOwner(noteId,
-        ownerPermissionError(userAndRoles, authorizationService.getOwners(noteId)));
+            ownerPermissionError(userAndRoles, authorizationService.getOwners(noteId)));
 
     HashMap<String, HashSet<String>> permMap =
-        GSON.fromJson(req, new TypeToken<HashMap<String, HashSet<String>>>() {
-        }.getType());
+            GSON.fromJson(req, new TypeToken<HashMap<String, HashSet<String>>>() {
+            }.getType());
     Note note = notebook.getNote(noteId);
     checkIfNoteIsNotNull(note);
 
@@ -317,7 +319,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response getNoteList() throws IOException {
     List<NoteInfo> notesInfo = notebookService.listNotesInfo(false, getServiceContext(),
-        new RestServiceCallback<List<NoteInfo>>());
+            new RestServiceCallback<List<NoteInfo>>());
     return new JsonResponse<>(Status.OK, "", notesInfo).build();
   }
 
@@ -333,7 +335,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response getNote(@PathParam("noteId") String noteId) throws IOException {
     Note note =
-        notebookService.getNote(noteId, getServiceContext(), new RestServiceCallback());
+            notebookService.getNote(noteId, getServiceContext(), new RestServiceCallback());
     return new JsonResponse<>(Status.OK, "", note).build();
   }
 
@@ -365,13 +367,9 @@ public class NotebookRestApi extends AbstractRestApi {
   @Path("import")
   @ZeppelinApi
   public Response importNote(@QueryParam("notePath") String notePath, String noteJson) throws IOException {
-    try {
-      Note note = notebookService.importNote(notePath, noteJson, getServiceContext(),
-              new RestServiceCallback());
-      return new JsonResponse<>(Status.OK, "", note.getId()).build();
-    } catch (IOException e) {
-      return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage()).build();
-    }
+    Note note = notebookService.importNote(notePath, noteJson, getServiceContext(),
+            new RestServiceCallback());
+    return new JsonResponse<>(Status.OK, "", note.getId()).build();
   }
 
   /**
@@ -385,13 +383,18 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response createNote(String message) throws IOException {
     String user = authenticationService.getPrincipal();
-    LOGGER.info("Create new note by JSON {}", message);
+    LOGGER.info("Creating new note by JSON {}", message);
     NewNoteRequest request = NewNoteRequest.fromJson(message);
+    String defaultInterpreterGroup = request.getDefaultInterpreterGroup();
+    if (StringUtils.isBlank(defaultInterpreterGroup)) {
+      defaultInterpreterGroup = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT);
+    }
     Note note = notebookService.createNote(
-        request.getName(),
-        zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT),
-        getServiceContext(),
-        new RestServiceCallback<>());
+            request.getName(),
+            defaultInterpreterGroup,
+            false,
+            getServiceContext(),
+            new RestServiceCallback<>());
     AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal());
     if (request.getParagraphs() != null) {
       for (NewParagraphRequest paragraphRequest : request.getParagraphs()) {
@@ -399,7 +402,7 @@ public class NotebookRestApi extends AbstractRestApi {
         initParagraph(p, paragraphRequest, user);
       }
     }
-    return new JsonResponse<>(Status.OK, "", note.getId()).build();
+    return new JsonResponse(Status.OK, "", note.getId()).build();
   }
 
   /**
@@ -415,13 +418,13 @@ public class NotebookRestApi extends AbstractRestApi {
   public Response deleteNote(@PathParam("noteId") String noteId) throws IOException {
     LOGGER.info("Delete note {} ", noteId);
     notebookService.removeNote(noteId,
-        getServiceContext(),
-        new RestServiceCallback<String>() {
-          @Override
-          public void onSuccess(String message, ServiceContext context) {
-            notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
-          }
-        });
+            getServiceContext(),
+            new RestServiceCallback<String>() {
+              @Override
+              public void onSuccess(String message, ServiceContext context) {
+                notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+              }
+            });
 
     return new JsonResponse<>(Status.OK, "").build();
   }
@@ -440,6 +443,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response cloneNote(@PathParam("noteId") String noteId, String message)
       throws IOException, IllegalArgumentException {
+
     LOGGER.info("Clone note by JSON {}", message);
     checkIfUserCanWrite(noteId, "Insufficient privileges you cannot clone this note");
     NewNoteRequest request = NewNoteRequest.fromJson(message);
@@ -449,13 +453,13 @@ public class NotebookRestApi extends AbstractRestApi {
     }
     AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal());
     Note newNote = notebookService.cloneNote(noteId, newNoteName, getServiceContext(),
-        new RestServiceCallback<Note>(){
-          @Override
-          public void onSuccess(Note newNote, ServiceContext context) throws IOException {
-            notebookServer.broadcastNote(newNote);
-            notebookServer.broadcastNoteList(subject, context.getUserAndRoles());
-          }
-        });
+            new RestServiceCallback<Note>() {
+              @Override
+              public void onSuccess(Note newNote, ServiceContext context) throws IOException {
+                notebookServer.broadcastNote(newNote);
+                notebookServer.broadcastNoteList(subject, context.getUserAndRoles());
+              }
+            });
     return new JsonResponse<>(Status.OK, "", newNote.getId()).build();
   }
 
@@ -471,6 +475,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response renameNote(@PathParam("noteId") String noteId,
                              String message) throws IOException {
+
     LOGGER.info("Rename note by JSON {}", message);
     RenameNoteRequest request = GSON.fromJson(message, RenameNoteRequest.class);
     String newName = request.getName();
@@ -479,13 +484,13 @@ public class NotebookRestApi extends AbstractRestApi {
       throw new BadRequestException("name can not be empty");
     }
     notebookService.renameNote(noteId, request.getName(), false, getServiceContext(),
-        new RestServiceCallback<Note>(){
-          @Override
-          public void onSuccess(Note note, ServiceContext context) throws IOException {
-            notebookServer.broadcastNote(note);
-            notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
-          }
-        });
+            new RestServiceCallback<Note>() {
+              @Override
+              public void onSuccess(Note note, ServiceContext context) throws IOException {
+                notebookServer.broadcastNote(note);
+                notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+              }
+            });
     return new JsonResponse(Status.OK, "").build();
   }
 
@@ -501,6 +506,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response insertParagraph(@PathParam("noteId") String noteId, String message)
       throws IOException {
+
     String user = authenticationService.getPrincipal();
     LOGGER.info("Insert paragraph {} {}", noteId, message);
 
@@ -535,6 +541,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response getParagraph(@PathParam("noteId") String noteId,
                                @PathParam("paragraphId") String paragraphId) throws IOException {
+
     LOGGER.info("Get paragraph {} {}", noteId, paragraphId);
 
     Note note = notebook.getNote(noteId);
@@ -542,7 +549,6 @@ public class NotebookRestApi extends AbstractRestApi {
     checkIfUserCanRead(noteId, "Insufficient privileges you cannot get this paragraph");
     Paragraph p = note.getParagraph(paragraphId);
     checkIfParagraphIsNotNull(p);
-
     return new JsonResponse<>(Status.OK, "", p).build();
   }
 
@@ -558,9 +564,9 @@ public class NotebookRestApi extends AbstractRestApi {
   public Response updateParagraph(@PathParam("noteId") String noteId,
                                   @PathParam("paragraphId") String paragraphId,
                                   String message) throws IOException {
+
     String user = authenticationService.getPrincipal();
     LOGGER.info("{} will update paragraph {} {}", user, noteId, paragraphId);
-
     Note note = notebook.getNote(noteId);
     checkIfNoteIsNotNull(note);
     checkIfUserCanWrite(noteId, "Insufficient privileges you cannot update this paragraph");
@@ -595,6 +601,7 @@ public class NotebookRestApi extends AbstractRestApi {
   public Response updateParagraphConfig(@PathParam("noteId") String noteId,
                                         @PathParam("paragraphId") String paragraphId,
                                         String message) throws IOException {
+
     String user = authenticationService.getPrincipal();
     LOGGER.info("{} will update paragraph config {} {}", user, noteId, paragraphId);
 
@@ -625,17 +632,17 @@ public class NotebookRestApi extends AbstractRestApi {
                                 @PathParam("paragraphId") String paragraphId,
                                 @PathParam("newIndex") String newIndex)
       throws IOException {
+
     LOGGER.info("Move paragraph {} {} {}", noteId, paragraphId, newIndex);
     notebookService.moveParagraph(noteId, paragraphId, Integer.parseInt(newIndex),
-        getServiceContext(),
-        new RestServiceCallback<Paragraph>() {
-          @Override
-          public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
-            notebookServer.broadcastNote(result.getNote());
-          }
-        });
+            getServiceContext(),
+            new RestServiceCallback<Paragraph>() {
+              @Override
+              public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
+                notebookServer.broadcastNote(result.getNote());
+              }
+            });
     return new JsonResponse(Status.OK, "").build();
-
   }
 
   /**
@@ -650,18 +657,30 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response deleteParagraph(@PathParam("noteId") String noteId,
                                   @PathParam("paragraphId") String paragraphId) throws IOException {
+
     LOGGER.info("Delete paragraph {} {}", noteId, paragraphId);
     notebookService.removeParagraph(noteId, paragraphId, getServiceContext(),
-        new RestServiceCallback<Paragraph>() {
-          @Override
-          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
-            notebookServer.broadcastNote(p.getNote());
-          }
-        });
+            new RestServiceCallback<Paragraph>() {
+              @Override
+              public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+                notebookServer.broadcastNote(p.getNote());
+              }
+            });
 
     return new JsonResponse(Status.OK, "").build();
   }
 
+  @POST
+  @Path("{noteId}/paragraph/next")
+  public Response nextSessionParagraph(@PathParam("noteId") String noteId,
+                                       @QueryParam("maxParagraph") int maxParagraph) throws IOException {
+
+    Paragraph p = notebookService.getNextSessionParagraph(noteId, maxParagraph,
+            getServiceContext(),
+            new RestServiceCallback<>());
+    return new JsonResponse(Status.OK, p.getId()).build();
+  }
+
   /**
    * Clear result of all paragraphs REST API.
    *
@@ -675,7 +694,7 @@ public class NotebookRestApi extends AbstractRestApi {
       throws IOException {
     LOGGER.info("Clear all paragraph output of note {}", noteId);
     notebookService.clearAllParagraphOutput(noteId, getServiceContext(),
-        new RestServiceCallback<>());
+            new RestServiceCallback<>());
     return new JsonResponse(Status.OK, "").build();
   }
 
@@ -697,7 +716,8 @@ public class NotebookRestApi extends AbstractRestApi {
                               @QueryParam("blocking") Boolean blocking,
                               @QueryParam("isolated") Boolean isolated,
                               String message)
-      throws IOException, IllegalArgumentException {
+      throws Exception, IllegalArgumentException {
+
     if (blocking == null) {
       blocking = false;
     }
@@ -719,13 +739,8 @@ public class NotebookRestApi extends AbstractRestApi {
     checkIfUserCanRun(noteId, "Insufficient privileges you cannot run job for this note");
 
     //TODO(zjffdu), can we run a note via rest api when cron is enabled ?
-    try {
-      note.runAll(subject, blocking, isolated, params);
-      return new JsonResponse<>(Status.OK).build();
-    } catch (Exception ex) {
-      LOGGER.error("Exception from run", ex);
-      return new JsonResponse<>(Status.EXPECTATION_FAILED, ex.getMessage()).build();
-    }
+    note.runAll(subject, blocking, isolated, params);
+    return new JsonResponse<>(Status.OK).build();
   }
 
   /**
@@ -741,6 +756,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response stopNoteJobs(@PathParam("noteId") String noteId)
       throws IOException, IllegalArgumentException {
+
     LOGGER.info("Stop note jobs {} ", noteId);
     Note note = notebook.getNote(noteId);
     checkIfNoteIsNotNull(note);
@@ -767,6 +783,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response getNoteJobStatus(@PathParam("noteId") String noteId)
       throws IOException, IllegalArgumentException {
+
     LOGGER.info("Get note job status.");
     Note note = notebook.getNote(noteId);
     checkIfNoteIsNotNull(note);
@@ -790,6 +807,7 @@ public class NotebookRestApi extends AbstractRestApi {
   public Response getNoteParagraphJobStatus(@PathParam("noteId") String noteId,
                                             @PathParam("paragraphId") String paragraphId)
       throws IOException, IllegalArgumentException {
+
     LOGGER.info("Get note paragraph job status.");
     Note note = notebook.getNote(noteId);
     checkIfNoteIsNotNull(note);
@@ -814,8 +832,11 @@ public class NotebookRestApi extends AbstractRestApi {
   @Path("job/{noteId}/{paragraphId}")
   @ZeppelinApi
   public Response runParagraph(@PathParam("noteId") String noteId,
-                               @PathParam("paragraphId") String paragraphId, String message)
+                               @PathParam("paragraphId") String paragraphId,
+                               @QueryParam("sessionId") String sessionId,
+                               String message)
       throws IOException, IllegalArgumentException {
+
     LOGGER.info("Run paragraph job asynchronously {} {} {}", noteId, paragraphId, message);
 
     Note note = notebook.getNote(noteId);
@@ -826,11 +847,11 @@ public class NotebookRestApi extends AbstractRestApi {
     Map<String, Object> params = new HashMap<>();
     if (!StringUtils.isEmpty(message)) {
       ParametersRequest request =
-          ParametersRequest.fromJson(message);
+              ParametersRequest.fromJson(message);
       params = request.getParams();
     }
     notebookService.runParagraph(noteId, paragraphId, paragraph.getTitle(),
-            paragraph.getText(), params, new HashMap<>(),
+            paragraph.getText(), params, new HashMap<>(), sessionId,
             false, false, getServiceContext(), new RestServiceCallback<>());
     return new JsonResponse<>(Status.OK).build();
   }
@@ -851,6 +872,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response runParagraphSynchronously(@PathParam("noteId") String noteId,
                                             @PathParam("paragraphId") String paragraphId,
+                                            @QueryParam("sessionId") String sessionId,
                                             String message)
       throws IOException, IllegalArgumentException {
     LOGGER.info("Run paragraph synchronously {} {} {}", noteId, paragraphId, message);
@@ -863,21 +885,17 @@ public class NotebookRestApi extends AbstractRestApi {
     Map<String, Object> params = new HashMap<>();
     if (!StringUtils.isEmpty(message)) {
       ParametersRequest request =
-          ParametersRequest.fromJson(message);
+              ParametersRequest.fromJson(message);
       params = request.getParams();
     }
 
     if (notebookService.runParagraph(noteId, paragraphId, paragraph.getTitle(),
-        paragraph.getText(), params,
-        new HashMap<>(), false, true, getServiceContext(), new RestServiceCallback<>())) {
+            paragraph.getText(), params,
+            new HashMap<>(), sessionId, false, true, getServiceContext(), new RestServiceCallback<>())) {
       note = notebookService.getNote(noteId, getServiceContext(), new RestServiceCallback<>());
       Paragraph p = note.getParagraph(paragraphId);
       InterpreterResult result = p.getReturn();
-      if (result.code() == InterpreterResult.Code.SUCCESS) {
-        return new JsonResponse<>(Status.OK, result).build();
-      } else {
-        return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, result).build();
-      }
+      return new JsonResponse<>(Status.OK, result).build();
     } else {
       return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, "Fail to run paragraph").build();
     }
@@ -900,7 +918,7 @@ public class NotebookRestApi extends AbstractRestApi {
       throws IOException, IllegalArgumentException {
     LOGGER.info("stop paragraph job {} ", noteId);
     notebookService.cancelParagraph(noteId, paragraphId, getServiceContext(),
-        new RestServiceCallback<Paragraph>());
+            new RestServiceCallback<Paragraph>());
     return new JsonResponse<>(Status.OK).build();
   }
 
@@ -917,6 +935,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response registerCronJob(@PathParam("noteId") String noteId, String message)
       throws IOException, IllegalArgumentException {
+
     LOGGER.info("Register cron job note={} request cron msg={}", noteId, message);
 
     CronRequest request = CronRequest.fromJson(message);
@@ -952,12 +971,13 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response removeCronJob(@PathParam("noteId") String noteId)
       throws IOException, IllegalArgumentException {
+
     LOGGER.info("Remove cron job note {}", noteId);
 
     Note note = notebook.getNote(noteId);
     checkIfNoteIsNotNull(note);
     checkIfUserIsOwner(noteId,
-        "Insufficient privileges you cannot remove this cron job from this note");
+            "Insufficient privileges you cannot remove this cron job from this note");
     checkIfNoteSupportsCron(note);
 
     Map<String, Object> config = note.getConfig();
@@ -982,6 +1002,7 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response getCronJob(@PathParam("noteId") String noteId)
       throws IOException, IllegalArgumentException {
+
     LOGGER.info("Get cron job note {}", noteId);
 
     Note note = notebook.getNote(noteId);
@@ -1008,7 +1029,7 @@ public class NotebookRestApi extends AbstractRestApi {
   public Response getJobListforNote() throws IOException, IllegalArgumentException {
     LOGGER.info("Get note jobs for job manager");
     List<JobManagerService.NoteJobInfo> noteJobs = jobManagerService
-        .getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>());
+            .getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>());
     Map<String, Object> response = new HashMap<>();
     response.put("lastResponseUnixTime", System.currentTimeMillis());
     response.put("jobs", noteJobs);
@@ -1031,8 +1052,8 @@ public class NotebookRestApi extends AbstractRestApi {
       throws IOException, IllegalArgumentException {
     LOGGER.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime);
     List<JobManagerService.NoteJobInfo> noteJobs =
-        jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(),
-            new RestServiceCallback<>());
+            jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(),
+                    new RestServiceCallback<>());
     Map<String, Object> response = new HashMap<>();
     response.put("lastResponseUnixTime", System.currentTimeMillis());
     response.put("jobs", noteJobs);
@@ -1057,9 +1078,9 @@ public class NotebookRestApi extends AbstractRestApi {
       String[] ids = notesFound.get(i).get("id").split("/", 2);
       String noteId = ids[0];
       if (!authorizationService.isOwner(noteId, userAndRoles) &&
-          !authorizationService.isReader(noteId, userAndRoles) &&
-          !authorizationService.isWriter(noteId, userAndRoles) &&
-          !authorizationService.isRunner(noteId, userAndRoles)) {
+              !authorizationService.isReader(noteId, userAndRoles) &&
+              !authorizationService.isWriter(noteId, userAndRoles) &&
+              !authorizationService.isRunner(noteId, userAndRoles)) {
         notesFound.remove(i);
         i--;
       }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionManager.java
new file mode 100644
index 0000000..342374f
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionManager.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.rest;
+
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.message.SessionInfo;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ *
+ * Backend manager of ZSessions
+ */
+public class SessionManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
+
+  private static final int RETRY = 3;
+  private Map<String, SessionInfo> sessions = new HashMap<>();
+  private InterpreterSettingManager interpreterSettingManager;
+  private Notebook notebook;
+
+  public SessionManager(Notebook notebook, InterpreterSettingManager interpreterSettingManager) {
+    this.notebook = notebook;
+    this.interpreterSettingManager = interpreterSettingManager;
+  }
+
+  /**
+   * Create a new session, including allocate new session Id and create dedicated note for this session.
+   *
+   * @param interpreter
+   * @return
+   * @throws Exception
+   */
+  public synchronized SessionInfo createSession(String interpreter) throws Exception {
+    String sessionId = null;
+    int i = 0;
+    while (i < RETRY) {
+      sessionId = interpreter + "_" + System.currentTimeMillis();
+       if (sessions.containsKey(sessionId)) {
+         try {
+           Thread.sleep(1);
+         } catch (InterruptedException e) {
+           e.printStackTrace();
+         }
+       } else {
+         break;
+       }
+    }
+
+    if (sessionId == null) {
+      throw new Exception("Unable to generate session id");
+    }
+
+    Note sessionNote = notebook.createNote("/_ZSession/" + interpreter + "/" + sessionId, AuthenticationInfo.ANONYMOUS);
+    SessionInfo sessionInfo = new SessionInfo(sessionId, sessionNote.getId(), interpreter);
+    sessions.put(sessionId, sessionInfo);
+    return sessionInfo;
+  }
+
+  /**
+   * Remove and stop this session.
+   *
+   * @param sessionId
+   */
+  public void removeSession(String sessionId) {
+    this.sessions.remove(sessionId);
+    InterpreterGroup interpreterGroup = this.interpreterSettingManager.getInterpreterGroupById(sessionId);
+    if (interpreterGroup == null) {
+      LOGGER.info("No interpreterGroup for session: " + sessionId);
+      return;
+    }
+    ((ManagedInterpreterGroup) interpreterGroup).getInterpreterSetting().closeInterpreters(sessionId);
+  }
+
+  /**
+   * Get the sessionInfo.
+   * It method will also update its state if these's associated interpreter process.
+   *
+   * @param sessionId
+   * @return
+   * @throws Exception
+   */
+  public SessionInfo getSession(String sessionId) throws Exception {
+    SessionInfo sessionInfo = sessions.get(sessionId);
+    if (sessionInfo == null) {
+      LOGGER.warn("No such session: " + sessionId);
+      return null;
+    }
+    InterpreterGroup interpreterGroup = this.interpreterSettingManager.getInterpreterGroupById(sessionId);
+    if (interpreterGroup != null) {
+      RemoteInterpreterProcess remoteInterpreterProcess =
+              ((ManagedInterpreterGroup) interpreterGroup).getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        sessionInfo.setState("Ready");
+      } else if (remoteInterpreterProcess != null) {
+        sessionInfo.setStartTime(remoteInterpreterProcess.getStartTime());
+        sessionInfo.setWeburl(interpreterGroup.getWebUrl());
+        if (remoteInterpreterProcess.isRunning()) {
+          sessionInfo.setState("Running");
+        } else {
+          sessionInfo.setState("Stopped");
+        }
+      }
+    }
+
+    return sessionInfo;
+  }
+
+  /**
+   * Get all sessions.
+   *
+   * @return
+   * @throws Exception
+   */
+  public List<SessionInfo> getAllSessions() throws Exception {
+    List<SessionInfo> sessionList = new ArrayList<>();
+    for (String sessionId : sessions.keySet()) {
+      SessionInfo session = getSession(sessionId);
+      if (session != null) {
+        sessionList.add(session);
+      }
+    }
+    return sessionList;
+  }
+
+  /**
+   * Get all sessions for provided interpreter.
+   *
+   * @param interpreter
+   * @return
+   * @throws Exception
+   */
+  public List<SessionInfo> getAllSessions(String interpreter) throws Exception {
+    List<SessionInfo> sessionList = new ArrayList<>();
+    for (String sessionId : sessions.keySet()) {
+      SessionInfo status = getSession(sessionId);
+      if (status != null && interpreter.equals(status.getInterpreter())) {
+        sessionList.add(status);
+      }
+    }
+    return sessionList;
+  }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java
new file mode 100644
index 0000000..8c5b164
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.zeppelin.annotation.ZeppelinApi;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.message.SessionInfo;
+import org.apache.zeppelin.server.JsonResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Response;
+import java.util.List;
+
+/**
+ * Rest api endpoint for the ZSession.
+ */
+@Path("/session")
+@Produces("application/json")
+@Singleton
+public class SessionRestApi {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SessionRestApi.class);
+
+  private SessionManager sessionManager;
+
+  @Inject
+  public SessionRestApi(Notebook notebook, InterpreterSettingManager interpreterSettingManager) {
+    this.sessionManager = new SessionManager(notebook, interpreterSettingManager);
+  }
+
+  @GET
+  @Path("/")
+  public Response listSessions(@QueryParam("interpreter") String interpreter) throws Exception {
+    if (StringUtils.isBlank(interpreter)) {
+      LOGGER.info("List all sessions");
+    } else {
+      LOGGER.info("List all sessions for interpreter: " + interpreter);
+    }
+    List<SessionInfo> sessionList = null;
+    if (StringUtils.isBlank(interpreter)) {
+      sessionList = sessionManager.getAllSessions();
+    } else {
+      sessionList = sessionManager.getAllSessions(interpreter);
+    }
+    return new JsonResponse<>(Response.Status.OK, sessionList).build();
+  }
+
+  @POST
+  @Path("/")
+  public Response createSession(@QueryParam("interpreter") String interpreter) throws Exception {
+    LOGGER.info("Create new session for interpreter: {}", interpreter);
+    SessionInfo sessionInfo = sessionManager.createSession(interpreter);
+    return new JsonResponse<>(Response.Status.OK, sessionInfo).build();
+  }
+
+  @DELETE
+  @Path("{sessionId}")
+  public Response stopSession(@PathParam("sessionId") String sessionId) {
+    LOGGER.info("Stop session: {}", sessionId);
+    sessionManager.removeSession(sessionId);
+    return new JsonResponse<>(Response.Status.OK, sessionId).build();
+  }
+
+  /**
+   * Get session info for provided sessionId.
+   */
+  @GET
+  @Path("{sessionId}")
+  @ZeppelinApi
+  public Response getSession(@PathParam("sessionId") String sessionId) throws Exception {
+    SessionInfo session = sessionManager.getSession(sessionId);
+    if (session == null) {
+      return new JsonResponse<>(Response.Status.NOT_FOUND).build();
+    } else {
+      return new JsonResponse<>(Response.Status.OK, "", session).build();
+    }
+  }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
index 79e3fe0..5615f87 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
@@ -23,10 +23,11 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ExceptionMapper;
 import javax.ws.rs.ext.Provider;
+
 import org.apache.zeppelin.rest.message.gson.ExceptionSerializer;
 
 @Provider
-public class WebApplicationExceptionMapper implements ExceptionMapper<WebApplicationException> {
+public class WebApplicationExceptionMapper implements ExceptionMapper<Throwable> {
   private final Gson gson;
 
   public WebApplicationExceptionMapper() {
@@ -37,8 +38,11 @@ public class WebApplicationExceptionMapper implements ExceptionMapper<WebApplica
   }
 
   @Override
-  public Response toResponse(WebApplicationException exception) {
-    return Response.status(exception.getResponse().getStatus())
-        .entity(gson.toJson(exception)).build();
+  public Response toResponse(Throwable exception) {
+    if (exception instanceof WebApplicationException) {
+      return ((WebApplicationException) exception).getResponse();
+    } else {
+      return Response.status(500).entity(gson.toJson(exception)).build();
+    }
   }
 }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
index 9cf1df1..7d6a3a6 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
@@ -26,10 +26,12 @@ import org.apache.zeppelin.common.JsonSerializable;
  *  NewNoteRequest rest api request message.
  */
 public class NewNoteRequest implements JsonSerializable {
-  private static final Gson gson = new Gson();
+  private static final Gson GSON = new Gson();
 
-  String name;
-  List<NewParagraphRequest> paragraphs;
+  //TODO(zjffdu) rename it to be notePath instead of name
+  private String name;
+  private String defaultInterpreterGroup;
+  private List<NewParagraphRequest> paragraphs;
 
   public NewNoteRequest (){
   }
@@ -38,15 +40,19 @@ public class NewNoteRequest implements JsonSerializable {
     return name;
   }
 
+  public String getDefaultInterpreterGroup() {
+    return defaultInterpreterGroup;
+  }
+
   public List<NewParagraphRequest> getParagraphs() {
     return paragraphs;
   }
 
   public String toJson() {
-    return gson.toJson(this);
+    return GSON.toJson(this);
   }
 
   public static NewNoteRequest fromJson(String json) {
-    return gson.fromJson(json, NewNoteRequest.class);
+    return GSON.fromJson(json, NewNoteRequest.class);
   }
 }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
index 8a6d0d0..f82cb9e 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
@@ -24,9 +24,9 @@ import org.apache.zeppelin.common.JsonSerializable;
  * RestartInterpreter rest api request message.
  */
 public class RestartInterpreterRequest implements JsonSerializable {
-  private static final Gson gson = new Gson();
+  private static final Gson GSON = new Gson();
 
-  String noteId;
+  private String noteId;
 
   public RestartInterpreterRequest() {
   }
@@ -36,10 +36,10 @@ public class RestartInterpreterRequest implements JsonSerializable {
   }
 
   public String toJson() {
-    return gson.toJson(this);
+    return GSON.toJson(this);
   }
 
   public static RestartInterpreterRequest fromJson(String json) {
-    return gson.fromJson(json, RestartInterpreterRequest.class);
+    return GSON.fromJson(json, RestartInterpreterRequest.class);
   }
 }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/SessionInfo.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/SessionInfo.java
new file mode 100644
index 0000000..8e8f2ab
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/SessionInfo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest.message;
+
+/**
+ * (TODO) Move it to zeppelin-common, so that we don't need to have 2 copies.
+ */
+public class SessionInfo {
+
+  private String sessionId;
+  private String noteId;
+  private String interpreter;
+  private String state;
+  private String weburl;
+  private String startTime;
+
+  public SessionInfo(String sessionId) {
+    this.sessionId = sessionId;
+  }
+
+  public SessionInfo(String sessionId, String noteId, String interpreter) {
+    this.sessionId = sessionId;
+    this.noteId = noteId;
+    this.interpreter = interpreter;
+  }
+
+  public SessionInfo(String sessionId, String noteId, String interpreter, String state, String weburl, String startTime) {
+    this.sessionId = sessionId;
+    this.noteId = noteId;
+    this.interpreter = interpreter;
+    this.state = state;
+    this.weburl = weburl;
+    this.startTime = startTime;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public String getNoteId() {
+    return noteId;
+  }
+
+  public String getState() {
+    return state;
+  }
+
+  public String getInterpreter() {
+    return interpreter;
+  }
+
+  public String getWeburl() {
+    return weburl;
+  }
+
+  public String getStartTime() {
+    return startTime;
+  }
+
+  public void setState(String state) {
+    this.state = state;
+  }
+
+  public void setStartTime(String startTime) {
+    this.startTime = startTime;
+  }
+
+  public void setWeburl(String weburl) {
+    this.weburl = weburl;
+  }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/gson/ExceptionSerializer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/gson/ExceptionSerializer.java
index ac654c0..78e6101 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/gson/ExceptionSerializer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/gson/ExceptionSerializer.java
@@ -21,6 +21,8 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
 import java.lang.reflect.Type;
 import javax.ws.rs.WebApplicationException;
 
@@ -32,6 +34,7 @@ public class ExceptionSerializer implements JsonSerializer<Exception> {
     JsonObject jsonObject = new JsonObject();
     jsonObject.addProperty("exception", e.getClass().getSimpleName());
     jsonObject.addProperty("message", e.getMessage());
+    jsonObject.addProperty("stacktrace", ExceptionUtils.getStackTrace(e));
 
     if (e instanceof WebApplicationException) {
       jsonObject.addProperty("status", ((WebApplicationException) e).getResponse().getStatus());
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index a1b27b5..3763345 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -22,8 +22,6 @@ package org.apache.zeppelin.service;
 import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN;
 
 import com.google.common.base.Strings;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -35,7 +33,6 @@ import java.util.Map;
 import java.util.Set;
 import javax.inject.Inject;
 
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.AngularObject;
@@ -52,6 +49,7 @@ import org.apache.zeppelin.notebook.AuthorizationService;
 import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
 import org.apache.zeppelin.notebook.scheduler.SchedulerService;
 import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.rest.SessionManager;
 import org.apache.zeppelin.rest.exception.BadRequestException;
 import org.apache.zeppelin.rest.exception.ForbiddenException;
 import org.apache.zeppelin.rest.exception.NoteNotFoundException;
@@ -136,8 +134,19 @@ public class NotebookService {
   }
 
 
+  /**
+   *
+   * @param notePath
+   * @param defaultInterpreterGroup
+   * @param addingEmptyParagraph
+   * @param context
+   * @param callback
+   * @return
+   * @throws IOException
+   */
   public Note createNote(String notePath,
                          String defaultInterpreterGroup,
+                         boolean addingEmptyParagraph,
                          ServiceContext context,
                          ServiceCallback<Note> callback) throws IOException {
 
@@ -150,7 +159,9 @@ public class NotebookService {
       Note note = notebook.createNote(normalizeNotePath(notePath), defaultInterpreterGroup,
           context.getAutheInfo(), false);
       // it's an empty note. so add one paragraph
-      note.addNewParagraph(context.getAutheInfo());
+      if (addingEmptyParagraph) {
+        note.addNewParagraph(context.getAutheInfo());
+      }
       notebook.saveNote(note, context.getAutheInfo());
       callback.onSuccess(note, context);
       return note;
@@ -304,6 +315,7 @@ public class NotebookService {
                               String text,
                               Map<String, Object> params,
                               Map<String, Object> config,
+                              String sessionId,
                               boolean failIfDisabled,
                               boolean blocking,
                               ServiceContext context,
@@ -353,7 +365,7 @@ public class NotebookService {
 
     try {
       notebook.saveNote(note, context.getAutheInfo());
-      note.run(p.getId(), blocking, context.getAutheInfo().getUser());
+      note.run(p.getId(), sessionId, blocking, context.getAutheInfo().getUser());
       callback.onSuccess(p, context);
       return true;
     } catch (Exception ex) {
@@ -409,7 +421,7 @@ public class NotebookService {
             Map<String, Object> params = (Map<String, Object>) raw.get("params");
             Map<String, Object> config = (Map<String, Object>) raw.get("config");
 
-            if (!runParagraph(noteId, paragraphId, title, text, params, config, false, true,
+            if (!runParagraph(noteId, paragraphId, title, text, params, config, null, false, true,
                     context, callback)) {
               // stop execution when one paragraph fails.
               return false;
@@ -619,6 +631,37 @@ public class NotebookService {
     callback.onSuccess(p, context);
   }
 
+  public Paragraph getNextSessionParagraph(String noteId,
+                                        int maxParagraph,
+                                        ServiceContext context,
+                                        ServiceCallback<Paragraph> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.PARAGRAPH_CLEAR_OUTPUT, context,
+            callback)) {
+      throw new IOException("No privilege to access this note");
+    }
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      throw new IOException("No such note");
+    }
+    if (note.getParagraphCount() < maxParagraph) {
+      return note.addNewParagraph(context.getAutheInfo());
+    } else {
+      boolean removed = false;
+      for (int i = 1; i< note.getParagraphCount(); ++i) {
+        if (note.getParagraph(i).getStatus().isCompleted()) {
+          note.removeParagraph(context.getAutheInfo().getUser(), note.getParagraph(i).getId());
+          removed = true;
+          break;
+        }
+      }
+      if (!removed) {
+        throw new IOException("All the paragraphs are not completed, unable to find available paragraph");
+      }
+      return note.addNewParagraph(context.getAutheInfo());
+    }
+  }
+
   public void clearParagraphOutput(String noteId,
                                    String paragraphId,
                                    ServiceContext context,
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 2db1d26..0a16c21 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -962,7 +962,7 @@ public class NotebookServer extends WebSocketServlet
     String noteName = (String) message.get("name");
     String defaultInterpreterGroup = (String) message.get("defaultInterpreterGroup");
 
-    getNotebookService().createNote(noteName, defaultInterpreterGroup, getServiceContext(message),
+    getNotebookService().createNote(noteName, defaultInterpreterGroup, true, getServiceContext(message),
         new WebSocketServiceCallback<Note>(conn) {
           @Override
           public void onSuccess(Note note, ServiceContext context) throws IOException {
@@ -1516,7 +1516,7 @@ public class NotebookServer extends WebSocketServlet
     String title = (String) fromMessage.get("title");
     Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
     Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
-    getNotebookService().runParagraph(noteId, paragraphId, title, text, params, config,
+    getNotebookService().runParagraph(noteId, paragraphId, title, text, params, config, null,
         false, false, getServiceContext(fromMessage),
         new WebSocketServiceCallback<Paragraph>(conn) {
           @Override
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java
index 7033929..6a9c8a8 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java
@@ -35,6 +35,7 @@ public class NotebookWebSocketCreator implements WebSocketCreator {
   public NotebookWebSocketCreator(NotebookServer notebookServer) {
     this.notebookServer = notebookServer;
   }
+
   public Object createWebSocket(ServletUpgradeRequest request, ServletUpgradeResponse response) {
     String origin = request.getHeader("Origin");
     if (notebookServer.checkOrigin(request.getHttpServletRequest(), origin)) {
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 9ebd83c..b6c626d 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -186,6 +186,7 @@ public abstract class AbstractTestRestApi {
       zeppelinHome = new File("..");
       LOG.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
       confDir = new File(zeppelinHome, "conf_" + testClassName);
+      FileUtils.deleteDirectory(confDir);
       LOG.info("ZEPPELIN_CONF_DIR: " + confDir.getAbsolutePath());
       confDir.mkdirs();
 
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
index 6039bf2..885a78d 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
@@ -107,7 +107,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
   }
 
   @Test
-  public void testRunParagraphJob() throws IOException {
+  public void testRunParagraphJob() throws Exception {
     LOG.info("Running testRunParagraphJob");
     Note note1 = null;
     try {
@@ -117,23 +117,25 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
       Paragraph p = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
 
       // run blank paragraph
-      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId() + "?blocking=true", "");
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId(), "");
       assertThat(post, isAllowed());
       Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
       assertEquals(resp.get("status"), "OK");
       post.releaseConnection();
+      p.waitUntilFinished();
       assertEquals(p.getStatus(), Job.Status.FINISHED);
 
       // run non-blank paragraph
       p.setText("test");
-      post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId() + "?blocking=true", "");
+      post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId(), "");
       assertThat(post, isAllowed());
       resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
       assertEquals(resp.get("status"), "OK");
       post.releaseConnection();
-      assertNotEquals(p.getStatus(), Job.Status.READY);
+      p.waitUntilFinished();
+      assertNotEquals(p.getStatus(), Job.Status.FINISHED);
     } finally {
       // cleanup
       if (null != note1) {
@@ -176,10 +178,10 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
       p.setText(text);
 
       post = httpPost("/notebook/run/" + note1.getId() + "/" + p.getId(), "");
-      assertEquals(500, post.getStatusCode());
+      assertEquals(200, post.getStatusCode());
       resp = gson.fromJson(post.getResponseBodyAsString(),
               new TypeToken<Map<String, Object>>() {}.getType());
-      assertEquals("INTERNAL_SERVER_ERROR", resp.get("status"));
+      assertEquals("OK", resp.get("status"));
       StringMap stringMap = (StringMap) resp.get("body");
       assertEquals("ERROR", stringMap.get("code"));
       List<StringMap> interpreterResults = (List<StringMap>) stringMap.get("msg");
@@ -470,12 +472,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
       p2.setText("%python user2='abc'\nprint(user2)");
 
       PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
-      assertThat(post, isExpectationFailed());
-      Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
-              new TypeToken<Map<String, Object>>() {}.getType());
-      assertEquals(resp.get("status"), "EXPECTATION_FAILED");
-      assertTrue(resp.get("message").toString().contains("Fail to run note because paragraph"));
-      post.releaseConnection();
+      assertThat(post, isAllowed());
 
       assertEquals(Job.Status.ERROR, p1.getStatus());
       // p2 will be skipped because p1 is failed.
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java
index b949d05..a3d67c8 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java
@@ -58,7 +58,7 @@ public class NotebookSecurityRestApiTest extends AbstractTestRestApi {
   
   @Test
   public void testThatUserCanCreateAndRemoveNote() throws IOException {
-    String noteId = createNoteForUser("test", "admin", "password1");
+    String noteId = createNoteForUser("test_1", "admin", "password1");
     assertNotNull(noteId);
     String id = getNoteIdForUser(noteId, "admin", "password1");
     assertThat(id, is(noteId));
@@ -67,7 +67,7 @@ public class NotebookSecurityRestApiTest extends AbstractTestRestApi {
   
   @Test
   public void testThatOtherUserCanAccessNoteIfPermissionNotSet() throws IOException {
-    String noteId = createNoteForUser("test", "admin", "password1");
+    String noteId = createNoteForUser("test_2", "admin", "password1");
     
     userTryGetNote(noteId, "user1", "password2", isAllowed());
     
@@ -76,7 +76,7 @@ public class NotebookSecurityRestApiTest extends AbstractTestRestApi {
   
   @Test
   public void testThatOtherUserCannotAccessNoteIfPermissionSet() throws IOException {
-    String noteId = createNoteForUser("test", "admin", "password1");
+    String noteId = createNoteForUser("test_3", "admin", "password1");
     
     //set permission
     String payload = "{ \"owners\": [\"admin\"], \"readers\": [\"user2\"], " +
@@ -94,7 +94,7 @@ public class NotebookSecurityRestApiTest extends AbstractTestRestApi {
   
   @Test
   public void testThatWriterCannotRemoveNote() throws IOException {
-    String noteId = createNoteForUser("test", "admin", "password1");
+    String noteId = createNoteForUser("test_4", "admin", "password1");
     
     //set permission
     String payload = "{ \"owners\": [\"admin\", \"user1\"], \"readers\": [\"user2\"], " +
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
index 6e87196..96ef8ee 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
@@ -173,7 +173,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
       expectedNoteName = "Note " + newNoteId;
     }
     assertEquals("compare note name", expectedNoteName, newNoteName);
-    assertEquals("initial paragraph check failed", 4, newNote.getParagraphs().size());
+    assertEquals("initial paragraph check failed", 3, newNote.getParagraphs().size());
     for (Paragraph p : newNote.getParagraphs()) {
       if (StringUtils.isEmpty(p.getText())) {
         continue;
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
index a463617..4c2156a 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
@@ -155,14 +155,14 @@ public class NotebookServiceTest {
     verify(callback).onSuccess(homeNote, context);
 
     // create note
-    Note note1 = notebookService.createNote("/folder_1/note1", "test", context, callback);
+    Note note1 = notebookService.createNote("/folder_1/note1", "test", true, context, callback);
     assertEquals("note1", note1.getName());
     assertEquals(1, note1.getParagraphCount());
     verify(callback).onSuccess(note1, context);
 
     // create duplicated note
     reset(callback);
-    Note note2 = notebookService.createNote("/folder_1/note1", "test", context, callback);
+    Note note2 = notebookService.createNote("/folder_1/note1", "test", true, context, callback);
     assertNull(note2);
     ArgumentCaptor<Exception> exception = ArgumentCaptor.forClass(Exception.class);
     verify(callback).onFailure(exception.capture(), any(ServiceContext.class));
@@ -203,7 +203,7 @@ public class NotebookServiceTest {
     assertEquals("/folder_4/new_name", notesInfo.get(0).getPath());
 
     // create another note
-    note2 = notebookService.createNote("/note2", "test", context, callback);
+    note2 = notebookService.createNote("/note2", "test", true, context, callback);
     assertEquals("note2", note2.getName());
     verify(callback).onSuccess(note2, context);
 
@@ -334,18 +334,18 @@ public class NotebookServiceTest {
   @Test
   public void testParagraphOperations() throws IOException {
     // create note
-    Note note1 = notebookService.createNote("note1", "python", context, callback);
+    Note note1 = notebookService.createNote("note1", "python", false, context, callback);
     assertEquals("note1", note1.getName());
-    assertEquals(1, note1.getParagraphCount());
+    assertEquals(0, note1.getParagraphCount());
     verify(callback).onSuccess(note1, context);
 
     // add paragraph
     reset(callback);
-    Paragraph p = notebookService.insertParagraph(note1.getId(), 1, new HashMap<>(), context,
+    Paragraph p = notebookService.insertParagraph(note1.getId(), 0, new HashMap<>(), context,
         callback);
     assertNotNull(p);
     verify(callback).onSuccess(p, context);
-    assertEquals(2, note1.getParagraphCount());
+    assertEquals(1, note1.getParagraphCount());
 
     // update paragraph
     reset(callback);
@@ -365,7 +365,7 @@ public class NotebookServiceTest {
     p.getConfig().put("colWidth", "6.0");
     p.getConfig().put("title", true);
     boolean runStatus = notebookService.runParagraph(note1.getId(), p.getId(), "my_title", "1+1",
-        new HashMap<>(), new HashMap<>(), false, false, context, callback);
+        new HashMap<>(), new HashMap<>(), null, false, false, context, callback);
     assertTrue(runStatus);
     verify(callback).onSuccess(p, context);
     assertEquals(2, p.getConfig().size());
@@ -373,7 +373,7 @@ public class NotebookServiceTest {
     // run paragraph synchronously via correct code
     reset(callback);
     runStatus = notebookService.runParagraph(note1.getId(), p.getId(), "my_title", "1+1",
-        new HashMap<>(), new HashMap<>(), false, true, context, callback);
+        new HashMap<>(), new HashMap<>(), null, false, true, context, callback);
     assertTrue(runStatus);
     verify(callback).onSuccess(p, context);
     assertEquals(2, p.getConfig().size());
@@ -387,7 +387,7 @@ public class NotebookServiceTest {
 
     reset(callback);
     runStatus = notebookService.runParagraph(note1.getId(), p.getId(), "my_title", "invalid_code",
-        new HashMap<>(), new HashMap<>(), false, true, context, callback);
+        new HashMap<>(), new HashMap<>(), null, false, true, context, callback);
     assertTrue(runStatus);
     // TODO(zjffdu) Enable it after ZEPPELIN-3699
     // assertNotNull(p.getResult());
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 1a2e05e..206236b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -419,6 +419,10 @@ public class InterpreterSetting {
   }
 
   private String getInterpreterGroupId(ExecutionContext executionContext) {
+    if (!StringUtils.isBlank(executionContext.getInterpreterGroupId())) {
+      return executionContext.getInterpreterGroupId();
+    }
+
     if (executionContext.isInIsolatedMode()) {
       return name + "-isolated-" + executionContext.getNoteId() + "-" +
               executionContext.getStartTime();
@@ -531,6 +535,10 @@ public class InterpreterSetting {
     closeInterpreters(new ExecutionContextBuilder().setUser(user).setNoteId(noteId).createExecutionContext());
   }
 
+  public void closeInterpreters(String interpreterGroupId) {
+    closeInterpreters(new ExecutionContextBuilder().setInterpreterGroupId(interpreterGroupId).createExecutionContext());
+  }
+
   public void closeInterpreters(ExecutionContext executionContext) {
     ManagedInterpreterGroup interpreterGroup = getInterpreterGroup(executionContext);
     if (interpreterGroup != null) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
index 5f67889..819f02b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -340,8 +340,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
         }.getType());
     String noteId = paraInfos.get("noteId");
     String paraId = paraInfos.get("paraId");
-    String settingId = RemoteInterpreterUtils.
-        getInterpreterSettingId(interpreterGroup.getId());
+    String settingId = ((ManagedInterpreterGroup) interpreterGroup).getInterpreterSetting().getId();
     if (noteId != null && paraId != null && settingId != null) {
       listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos);
     }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 616bfc3..209d5dc 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -27,6 +27,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 
 /**
  * Abstract class for interpreter process
@@ -34,11 +36,14 @@ import java.io.IOException;
 public abstract class RemoteInterpreterProcess implements InterpreterClient {
   private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
   private static final Gson GSON = new Gson();
+  private static final SimpleDateFormat START_TIME_FORMATTER =
+          new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
   private int connectTimeout;
   protected String intpEventServerHost;
   protected int intpEventServerPort;
   private PooledRemoteClient<Client> remoteClient;
+  private String startTime;
 
   public RemoteInterpreterProcess(int connectTimeout,
                                   int connectionPoolSize,
@@ -47,6 +52,7 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
     this.connectTimeout = connectTimeout;
     this.intpEventServerHost = intpEventServerHost;
     this.intpEventServerPort = intpEventServerPort;
+    this.startTime = START_TIME_FORMATTER.format(new Date());
     this.remoteClient = new PooledRemoteClient<Client>(() -> {
       TSocket transport = new TSocket(getHost(), getPort());
       try {
@@ -63,6 +69,10 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
     return connectTimeout;
   }
 
+  public String getStartTime() {
+    return startTime;
+  }
+
   public void shutdown() {
     if (remoteClient != null) {
       remoteClient.shutdown();
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index a55567b..1a1c08a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -788,8 +788,7 @@ public class Note implements JsonSerializable {
           // Must run each paragraph in blocking way.
           if (!run(p.getId(), true)) {
             LOGGER.warn("Skip running the remain notes because paragraph {} fails", p.getId());
-            throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, result: " +
-                    p.getReturn());
+            return;
           }
         } catch (InterpreterNotFoundException e) {
           // ignore, because the following run method will fail if interpreter not found.
@@ -834,7 +833,7 @@ public class Note implements JsonSerializable {
    * @param blocking Whether run this paragraph in blocking way
    */
   public boolean run(String paragraphId, boolean blocking) {
-    return run(paragraphId, blocking, null);
+    return run(paragraphId, null, blocking, null);
   }
 
   /**
@@ -846,6 +845,7 @@ public class Note implements JsonSerializable {
    * @return
    */
   public boolean run(String paragraphId,
+                     String interpreterGroupId,
                      boolean blocking,
                      String ctxUser) {
     Paragraph p = getParagraph(paragraphId);
@@ -854,7 +854,7 @@ public class Note implements JsonSerializable {
       p = p.getUserParagraph(ctxUser);
 
     p.setListener(this.paragraphJobListener);
-    return p.execute(blocking);
+    return p.execute(interpreterGroupId, blocking);
   }
 
   /**
@@ -994,12 +994,10 @@ public class Note implements JsonSerializable {
   public List<InterpreterSetting> getUsedInterpreterSettings() {
     Set<InterpreterSetting> settings = new HashSet<>();
     for (Paragraph p : getParagraphs()) {
-      try {
-        Interpreter intp = p.getBindedInterpreter();
+      Interpreter intp = p.getInterpreter();
+      if (intp != null) {
         settings.add((
                 (ManagedInterpreterGroup) intp.getInterpreterGroup()).getInterpreterSetting());
-      } catch (InterpreterNotFoundException e) {
-        // ignore this
       }
     }
     return new ArrayList<>(settings);
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 336be44..689ecfd 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -81,6 +81,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   private String text;
   private String user;
   private Date dateUpdated;
+  private int progress;
   // paragraph configs like isOpen, colWidth, etc
   private Map<String, Object> config = new HashMap<>();
   // form and parameter settings
@@ -93,6 +94,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   private transient String intpText;
   private transient String scriptText;
   private transient Interpreter interpreter;
+  private transient String interpreterGroupId;
   private transient Note note;
   private transient AuthenticationInfo subject;
   // personalized
@@ -248,15 +250,21 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
             .setDefaultInterpreterGroup(note.getDefaultInterpreterGroup())
             .setInIsolatedMode(note.isIsolatedMode())
             .setStartTime(note.getStartTime())
+            .setInterpreterGroupId(interpreterGroupId)
             .createExecutionContext();
 
     return this.note.getInterpreterFactory().getInterpreter(intpText, executionContext);
   }
 
+  @VisibleForTesting
   public void setInterpreter(Interpreter interpreter) {
     this.interpreter = interpreter;
   }
 
+  public Interpreter getInterpreter() {
+    return interpreter;
+  }
+
   public List<InterpreterCompletion> completion(String buffer, int cursor) {
     setText(buffer);
     try {
@@ -297,7 +305,8 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   public int progress() {
     try {
       if (this.interpreter != null) {
-        return this.interpreter.getProgress(getInterpreterContext());
+        this.progress = this.interpreter.getProgress(getInterpreterContext());
+        return this.progress;
       } else {
         return 0;
       }
@@ -319,13 +328,18 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
     return checkEmptyConfig && Strings.isNullOrEmpty(scriptText) && localProperties.isEmpty();
   }
 
+  public boolean execute(boolean blocking) {
+    return execute(null, blocking);
+  }
+
   /**
    * Return true only when paragraph run successfully with state of FINISHED.
    * @param blocking
    * @return
    */
-  public boolean execute(boolean blocking) {
+  public boolean execute(String interpreterGroupId, boolean blocking) {
     try {
+      this.interpreterGroupId = interpreterGroupId;
       this.interpreter = getBindedInterpreter();
       InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup)
               interpreter.getInterpreterGroup()).getInterpreterSetting();
@@ -374,6 +388,15 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   }
 
   @Override
+  public void setStatus(Status status) {
+    super.setStatus(status);
+    // reset interpreterGroupId when paragraph is completed.
+    if (status.isCompleted()) {
+      this.interpreterGroupId = null;
+    }
+  }
+
+  @Override
   protected InterpreterResult jobRun() throws Throwable {
     try {
       if (localProperties.getOrDefault("isRecover", "false").equals("false")) {